diff --git a/CHANGELOG.md b/CHANGELOG.md index d3e9081cd..21e068064 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## UNRELEASED +- breaking change: You have to call dc_stop_io()/dc_start_io() before/after EXPORT_BACKUP: + fix race condition and db corruption when a message was received during backup #2253 + - new apis to get full or html message, `dc_msg_has_html()` and `dc_get_msg_html()` #2125 #2151 diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 506d40764..83df277b0 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -1858,6 +1858,7 @@ dc_contact_t* dc_get_contact (dc_context_t* context, uint32_t co /** * Import/export things. + * During backup import/export IO must not be started, if needed stop IO using dc_stop_io() first. * What to do is defined by the _what_ parameter which may be one of the following: * * - **DC_IMEX_EXPORT_BACKUP** (11) - Export a backup to the directory given as `param1`. diff --git a/examples/repl/cmdline.rs b/examples/repl/cmdline.rs index 2005defe6..6b95669ee 100644 --- a/examples/repl/cmdline.rs +++ b/examples/repl/cmdline.rs @@ -13,6 +13,7 @@ use deltachat::dc_receive_imf::*; use deltachat::dc_tools::*; use deltachat::imex::*; use deltachat::location; +use deltachat::log::LogExt; use deltachat::lot::LotState; use deltachat::message::{self, ContactRequestDecision, Message, MessageState, MsgId}; use deltachat::peerstate::*; @@ -508,7 +509,7 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu context.maybe_network().await; } "housekeeping" => { - sql::housekeeping(&context).await; + sql::housekeeping(&context).await.log(&context); } "listchats" | "listarchived" | "chats" => { let listflags = if arg0 == "listarchived" { 0x01 } else { 0 }; diff --git a/python/src/deltachat/_build.py b/python/src/deltachat/_build.py index 252c5c212..a0e5138b5 100644 --- a/python/src/deltachat/_build.py +++ b/python/src/deltachat/_build.py @@ -150,6 +150,7 @@ def extract_defines(flags): | DC_CHAT | DC_PROVIDER | DC_KEY_GEN + | DC_IMEX ) # End of prefix matching _[\w_]+ # Match the suffix, e.g. _RSA2048 in DC_KEY_GEN_RSA2048 ) # Close the capturing group, this contains diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 8aef54769..b4685659f 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -412,23 +412,23 @@ class Account(object): Note that the account does not have to be started. """ - return self._export(path, imex_cmd=1) + return self._export(path, imex_cmd=const.DC_IMEX_EXPORT_SELF_KEYS) def export_all(self, path): """return new file containing a backup of all database state (chats, contacts, keys, media, ...). The file is created in the the `path` directory. - Note that the account does not have to be started. + Note that the account has to be stopped; call stop_io() if necessary. """ - export_files = self._export(path, 11) + export_files = self._export(path, const.DC_IMEX_EXPORT_BACKUP) if len(export_files) != 1: raise RuntimeError("found more than one new file") return export_files[0] def _export(self, path, imex_cmd): with self.temp_plugin(ImexTracker()) as imex_tracker: - lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL) + self.imex(path, imex_cmd) return imex_tracker.wait_finish() def import_self_keys(self, path): @@ -438,7 +438,7 @@ class Account(object): Note that the account does not have to be started. """ - self._import(path, imex_cmd=2) + self._import(path, imex_cmd=const.DC_IMEX_IMPORT_SELF_KEYS) def import_all(self, path): """import delta chat state from the specified backup `path` (a file). @@ -446,13 +446,16 @@ class Account(object): The account must be in unconfigured state for import to attempted. """ assert not self.is_configured(), "cannot import into configured account" - self._import(path, imex_cmd=12) + self._import(path, imex_cmd=const.DC_IMEX_IMPORT_BACKUP) def _import(self, path, imex_cmd): with self.temp_plugin(ImexTracker()) as imex_tracker: - lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL) + self.imex(path, imex_cmd) imex_tracker.wait_finish() + def imex(self, path, imex_cmd): + lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL) + def initiate_key_transfer(self): """return setup code after a Autocrypt setup message has been successfully sent to our own e-mail address ("self-sent message"). @@ -577,6 +580,28 @@ class Account(object): raise ValueError("account not configured, cannot start io") lib.dc_start_io(self._dc_context) + def maybe_network(self): + """This function should be called when there is a hint + that the network is available again, + e.g. as a response to system event reporting network availability. + The library will try to send pending messages out immediately. + + Moreover, to have a reliable state + when the app comes to foreground with network available, + it may be reasonable to call the function also at that moment. + + It is okay to call the function unconditionally when there is + network available, however, calling the function + _without_ having network may interfere with the backoff algorithm + and will led to let the jobs fail faster, with fewer retries + and may avoid messages being sent out. + + Finally, if the context was created by the dc_accounts_t account manager + (currently not implemented in the Python bindings), + use dc_accounts_maybe_network() instead of this function + """ + lib.dc_maybe_network(self._dc_context) + def configure(self, reconfigure=False): """ Start configuration process and return a Configtracker instance on which you can block with wait_finish() to get a True/False success diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 30b6f12ce..107237630 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -476,6 +476,7 @@ class TestOfflineChat: contact = msg.get_sender_contact() assert contact == ac1.get_self_contact() assert not backupdir.listdir() + ac1.stop_io() path = ac1.export_all(backupdir.strpath) assert os.path.exists(path) ac2 = acfactory.get_unconfigured_account() @@ -1497,21 +1498,42 @@ class TestOnlineAccount: original_image_path = data.get_path("d.png") chat1.send_image(original_image_path) + def assert_account_is_proper(ac): + contacts = ac.get_contacts(query="some1") + assert len(contacts) == 1 + contact2 = contacts[0] + assert contact2.addr == "some1@example.org" + chat2 = contact2.create_chat() + messages = chat2.get_messages() + assert len(messages) == 2 + assert messages[0].text == "msg1" + lp.sec("dbg file"+messages[1].filename) + assert messages[1].filemime == "image/png" + assert os.stat(messages[1].filename).st_size == os.stat(original_image_path).st_size + ac.set_config("displayname", "new displayname") + assert ac.get_config("displayname") == "new displayname" + + assert_account_is_proper(ac1) + backupdir = tmpdir.mkdir("backup") lp.sec("export all to {}".format(backupdir)) with ac1.temp_plugin(ImexTracker()) as imex_tracker: - path = ac1.export_all(backupdir.strpath) - assert os.path.exists(path) + + ac1.stop_io() + ac1.imex(backupdir.strpath, const.DC_IMEX_EXPORT_BACKUP) # check progress events for export assert imex_tracker.wait_progress(1, progress_upper_limit=249) assert imex_tracker.wait_progress(250, progress_upper_limit=499) assert imex_tracker.wait_progress(500, progress_upper_limit=749) assert imex_tracker.wait_progress(750, progress_upper_limit=999) - assert imex_tracker.wait_progress(1000) - t = time.time() + paths = imex_tracker.wait_finish() + assert len(paths) == 1 + path = paths[0] + assert os.path.exists(path) + ac1.start_io() lp.sec("get fresh empty account") ac2 = acfactory.get_unconfigured_account() @@ -1530,23 +1552,11 @@ class TestOnlineAccount: assert imex_tracker.wait_progress(750, progress_upper_limit=999) assert imex_tracker.wait_progress(1000) - contacts = ac2.get_contacts(query="some1") - assert len(contacts) == 1 - contact2 = contacts[0] - assert contact2.addr == "some1@example.org" - chat2 = contact2.create_chat() - messages = chat2.get_messages() - assert len(messages) == 2 - assert messages[0].text == "msg1" - assert messages[1].filemime == "image/png" - assert os.stat(messages[1].filename).st_size == os.stat(original_image_path).st_size + assert_account_is_proper(ac1) + assert_account_is_proper(ac2) - # wait until a second passed since last backup - # because get_latest_backupfile() shall return the latest backup - # from a UI it's unlikely anyone manages to export two - # backups in one second. - time.sleep(max(0, 1 - (time.time() - t))) lp.sec("Second-time export all to {}".format(backupdir)) + ac1.stop_io() path2 = ac1.export_all(backupdir.strpath) assert os.path.exists(path2) assert path2 != path diff --git a/src/ephemeral.rs b/src/ephemeral.rs index 47163fc9f..47457ba94 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -501,7 +501,7 @@ mod tests { #[async_std::test] async fn test_stock_ephemeral_messages() { - let context = TestContext::new().await.ctx; + let context = TestContext::new().await; assert_eq!( stock_ephemeral_timer_changed(&context, Timer::Disabled, DC_CONTACT_ID_SELF).await, diff --git a/src/imap/mod.rs b/src/imap/mod.rs index f9706faa1..1c086a93c 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -564,8 +564,8 @@ impl Imap { .uid_validity .with_context(|| format!("No UIDVALIDITY for folder {}", folder))?; - let old_uid_validity = get_uidvalidity(context, folder).await; - let old_uid_next = get_uid_next(context, folder).await; + let old_uid_validity = get_uidvalidity(context, folder).await?; + let old_uid_next = get_uid_next(context, folder).await?; if new_uid_validity == old_uid_validity { let new_emails = if newly_selected == NewlySelected::No { @@ -658,7 +658,7 @@ impl Imap { return Ok(false); } - let old_uid_next = get_uid_next(context, folder.as_ref()).await; + let old_uid_next = get_uid_next(context, folder.as_ref()).await?; let msgs = if fetch_existing_msgs { self.prefetch_existing_msgs().await? @@ -1733,16 +1733,15 @@ pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32) /// This method returns the uid_next from the last time we fetched messages. /// We can compare this to the current uid_next to find out whether there are new messages /// and fetch from this value on to get all new messages. -async fn get_uid_next(context: &Context, folder: &str) -> u32 { - context +async fn get_uid_next(context: &Context, folder: &str) -> Result { + Ok(context .sql - .query_get_value( - context, + .query_get_value_result( "SELECT uid_next FROM imap_sync WHERE folder=?;", paramsv![folder], ) - .await - .unwrap_or(0) + .await? + .unwrap_or(0)) } pub(crate) async fn set_uidvalidity( @@ -1761,16 +1760,15 @@ pub(crate) async fn set_uidvalidity( Ok(()) } -async fn get_uidvalidity(context: &Context, folder: &str) -> u32 { - context +async fn get_uidvalidity(context: &Context, folder: &str) -> Result { + Ok(context .sql - .query_get_value( - context, + .query_get_value_result( "SELECT uidvalidity FROM imap_sync WHERE folder=?;", paramsv![folder], ) - .await - .unwrap_or(0) + .await? + .unwrap_or(0)) } /// Deprecated, use get_uid_next() and get_uidvalidity() @@ -1879,17 +1877,17 @@ mod tests { #[async_std::test] async fn test_set_uid_next_validity() { let t = TestContext::new_alice().await; - assert_eq!(get_uid_next(&t.ctx, "Inbox").await, 0); - assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await, 0); + assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0); + assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 0); set_uidvalidity(&t.ctx, "Inbox", 7).await.unwrap(); - assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await, 7); - assert_eq!(get_uid_next(&t.ctx, "Inbox").await, 0); + assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 7); + assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0); set_uid_next(&t.ctx, "Inbox", 5).await.unwrap(); set_uidvalidity(&t.ctx, "Inbox", 6).await.unwrap(); - assert_eq!(get_uid_next(&t.ctx, "Inbox").await, 5); - assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await, 6); + assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 5); + assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 6); } #[test] diff --git a/src/imex.rs b/src/imex.rs index b061d143e..aa42b729c 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -11,7 +11,6 @@ use async_std::{ }; use rand::{thread_rng, Rng}; -use crate::blob::BlobObject; use crate::chat; use crate::chat::delete_and_reset_all_device_msgs; use crate::config::Config; @@ -30,6 +29,7 @@ use crate::param::Param; use crate::pgp; use crate::sql::{self, Sql}; use crate::stock_str; +use crate::{blob::BlobObject, log::LogExt}; use ::pgp::types::KeyTrait; use async_tar::Archive; @@ -496,6 +496,10 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef) -> !context.is_configured().await, "Cannot import backups to accounts in use." ); + ensure!( + !context.scheduler.read().await.is_running(), + "cannot import backup, IO already running" + ); context.sql.close().await; dc_delete_file(context, context.get_dbfile()).await; ensure!( @@ -563,6 +567,10 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef !context.is_configured().await, "Cannot import backups to accounts in use." ); + ensure!( + !context.scheduler.read().await.is_running(), + "cannot import backup, IO already running" + ); context.sql.close().await; dc_delete_file(context, context.get_dbfile()).await; ensure!( @@ -668,7 +676,7 @@ async fn export_backup(context: &Context, dir: impl AsRef) -> Result<()> { .sql .set_raw_config_int(context, "backup_time", now as i32) .await?; - sql::housekeeping(context).await; + sql::housekeeping(context).await.log(context); context .sql @@ -676,6 +684,11 @@ async fn export_backup(context: &Context, dir: impl AsRef) -> Result<()> { .await .map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e)); + ensure!( + !context.scheduler.read().await.is_running(), + "cannot export backup, IO already running" + ); + // we close the database during the export context.sql.close().await; diff --git a/src/job.rs b/src/job.rs index 3efaef58e..35626bf56 100644 --- a/src/job.rs +++ b/src/job.rs @@ -2,16 +2,16 @@ //! //! This module implements a job queue maintained in the SQLite database //! and job types. -use std::fmt; use std::future::Future; +use std::{fmt, time::Duration}; use anyhow::{bail, ensure, format_err, Context as _, Error, Result}; use async_smtp::smtp::response::{Category, Code, Detail}; +use async_std::task::sleep; use deltachat_derive::{FromSql, ToSql}; use itertools::Itertools; use rand::{thread_rng, Rng}; -use crate::context::Context; use crate::dc_tools::{dc_delete_file, dc_read_file, time}; use crate::ephemeral::load_imap_deletion_msgid; use crate::events::EventType; @@ -29,6 +29,7 @@ use crate::{ }; use crate::{config::Config, constants::Blocked}; use crate::{constants::Chattype, contact::Contact}; +use crate::{context::Context, log::LogExt}; use crate::{scheduler::InterruptInfo, sql}; // results in ~3 weeks for the last backoff timespan @@ -1156,7 +1157,7 @@ async fn perform_job_action( Action::MoveMsg => job.move_msg(context, connection.inbox()).await, Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await, Action::Housekeeping => { - sql::housekeeping(context).await; + sql::housekeeping(context).await.log(context); Status::Finished(Ok(())) } }; @@ -1265,6 +1266,17 @@ pub(crate) async fn load_next( ) -> Option { info!(context, "loading job for {}-thread", thread); + while !context.sql.is_open().await { + // The db is closed, which means that this thread should not be running. + // Wait until the db is re-opened (if we returned None, this thread might do further damage) + warn!( + context, + "{}: load_next() was called but the db was not opened, THIS SHOULD NOT HAPPEN. Waiting...", + thread + ); + sleep(Duration::from_millis(500)).await; + } + let query; let params; let t = time(); diff --git a/src/log.rs b/src/log.rs index bcc9d1d43..cb5304738 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,4 +1,6 @@ -//! # Logging macros +//! # Logging + +use crate::context::Context; #[macro_export] macro_rules! info { @@ -58,3 +60,39 @@ macro_rules! emit_event { $ctx.emit_event($event); }; } + +pub trait LogExt { + /// Emits a warning if the receiver contains an Err value. + /// + /// Returns an [`Option`] with the `Ok(_)` value, if any: + /// - You won't get any warnings about unused results but can still use the value if you need it + /// - This prevents the same warning from being printed to the log multiple times + /// + /// Thanks to the [track_caller](https://blog.rust-lang.org/2020/08/27/Rust-1.46.0.html#track_caller) + /// feature, the location of the caller is printed to the log, just like with the warn!() macro. + #[track_caller] + fn log(self, context: &Context) -> Option; +} + +impl LogExt for anyhow::Result { + #[track_caller] + fn log(self, context: &Context) -> Option { + match self { + Err(e) => { + let location = std::panic::Location::caller(); + // We are using Anyhow's .context() and to show the inner error, too, we need the {:#}: + let full = format!( + "{file}:{line}: {e:#}", + file = location.file(), + line = location.line(), + e = e + ); + // We can't use the warn!() macro here as the file!() and line!() macros + // don't work well with #[track_caller] + emit_event!(context, crate::EventType::Warning(full)); + None + } + Ok(v) => Some(v), + } + } +} diff --git a/src/sql.rs b/src/sql.rs index 86bd3d0f1..e0306e1a8 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -8,6 +8,7 @@ use std::path::Path; use std::time::Duration; use anyhow::format_err; +use anyhow::Context as _; use rusqlite::{Connection, Error as SqlError, OpenFlags}; use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon}; @@ -495,7 +496,7 @@ pub fn get_rowid2( ) } -pub async fn housekeeping(context: &Context) { +pub async fn housekeeping(context: &Context) -> anyhow::Result<()> { if let Err(err) = crate::ephemeral::delete_expired_messages(context).await { warn!(context, "Failed to delete expired messages: {}", err); } @@ -510,28 +511,28 @@ pub async fn housekeeping(context: &Context) { "SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;", Param::File, ) - .await; + .await?; maybe_add_from_param( context, &mut files_in_use, "SELECT param FROM jobs;", Param::File, ) - .await; + .await?; maybe_add_from_param( context, &mut files_in_use, "SELECT param FROM chats;", Param::ProfileImage, ) - .await; + .await?; maybe_add_from_param( context, &mut files_in_use, "SELECT param FROM contacts;", Param::ProfileImage, ) - .await; + .await?; context .sql @@ -547,9 +548,7 @@ pub async fn housekeeping(context: &Context) { }, ) .await - .unwrap_or_else(|err| { - warn!(context, "sql: failed query: {}", err); - }); + .context("housekeeping: failed to SELECT value FROM config")?; info!(context, "{} files in use.", files_in_use.len(),); /* go through directory and delete unused files */ @@ -637,6 +636,7 @@ pub async fn housekeeping(context: &Context) { warn!(context, "Can't set config: {}", e); } info!(context, "Housekeeping done."); + Ok(()) } #[allow(clippy::indexing_slicing)] @@ -665,7 +665,7 @@ async fn maybe_add_from_param( files_in_use: &mut HashSet, query: &str, param_id: Param, -) { +) -> anyhow::Result<()> { context .sql .query_map( @@ -683,9 +683,7 @@ async fn maybe_add_from_param( }, ) .await - .unwrap_or_else(|err| { - warn!(context, "sql: failed to add_from_param: {}", err); - }); + .context(format!("housekeeping: failed to add_from_param {}", query)) } #[allow(clippy::cognitive_complexity)] @@ -1570,8 +1568,10 @@ async fn prune_tombstones(context: &Context) -> Result<()> { #[cfg(test)] mod test { + use async_std::fs::File; + use super::*; - use crate::test_utils::TestContext; + use crate::{test_utils::TestContext, Event, EventType}; #[test] fn test_maybe_add_file() { @@ -1613,4 +1613,44 @@ mod test { assert!(!t.ctx.sql.col_exists("msgs", "foobar").await.unwrap()); assert!(!t.ctx.sql.col_exists("foobar", "foobar").await.unwrap()); } + + #[async_std::test] + async fn test_housekeeping_db_closed() { + let t = TestContext::new().await; + + let avatar_src = t.dir.path().join("avatar.png"); + let avatar_bytes = include_bytes!("../test-data/image/avatar64x64.png"); + File::create(&avatar_src) + .await + .unwrap() + .write_all(avatar_bytes) + .await + .unwrap(); + t.set_config(Config::Selfavatar, Some(avatar_src.to_str().unwrap())) + .await + .unwrap(); + + t.add_event_sink(move |event: Event| async move { + match event.typ { + EventType::Info(s) => assert!( + !s.contains("Keeping new unreferenced file"), + "File {} was almost deleted, only reason it was kept is that it was created recently (as the tests don't run for a long time)", + s + ), + EventType::Error(s) => panic!(s), + _ => {} + } + }) + .await; + + let a = t.get_config(Config::Selfavatar).await.unwrap(); + assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]); + + t.sql.close().await; + housekeeping(&t).await.unwrap_err(); // housekeeping should fail as the db is closed + t.sql.open(&t, &t.get_dbfile(), false).await.unwrap(); + + let a = t.get_config(Config::Selfavatar).await.unwrap(); + assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]); + } } diff --git a/src/test_utils.rs b/src/test_utils.rs index c8b1fe5f0..3957a39dd 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -2,17 +2,17 @@ //! //! This private module is only compiled for test runs. -use std::collections::BTreeMap; use std::fmt; use std::ops::Deref; use std::str::FromStr; use std::time::{Duration, Instant}; +use std::{collections::BTreeMap, panic}; use ansi_term::Color; use async_std::future::Future; use async_std::path::PathBuf; -use async_std::pin::Pin; use async_std::sync::{Arc, RwLock}; +use async_std::{channel, pin::Pin}; use chat::ChatItem; use once_cell::sync::Lazy; use tempfile::{tempdir, TempDir}; @@ -50,6 +50,8 @@ pub(crate) struct TestContext { recv_idx: RwLock, /// Functions to call for events received. event_sinks: Arc>>>, + /// Receives panics from sinks ("sink" means "event handler" here) + poison_receiver: channel::Receiver, } impl fmt::Debug for TestContext { @@ -97,7 +99,15 @@ impl TestContext { let events = ctx.get_event_emitter(); let event_sinks: Arc>>> = Arc::new(RwLock::new(Vec::new())); let sinks = Arc::clone(&event_sinks); + let (poison_sender, poison_receiver) = channel::bounded(1); async_std::task::spawn(async move { + // Make sure that the test fails if there is a panic on this thread here: + let orig_hook = panic::take_hook(); + panic::set_hook(Box::new(move |panic_info| { + poison_sender.try_send(panic_info.to_string()).ok(); + orig_hook(panic_info); + })); + while let Some(event) = events.recv().await { { let sinks = sinks.read().await; @@ -114,6 +124,7 @@ impl TestContext { dir, recv_idx: RwLock::new(0), event_sinks, + poison_receiver, } } @@ -410,6 +421,14 @@ impl Deref for TestContext { } } +impl Drop for TestContext { + fn drop(&mut self) { + if let Ok(p) = self.poison_receiver.try_recv() { + panic!(p); + } + } +} + /// A raw message as it was scheduled to be sent. /// /// This is a raw message, probably in the shape DC was planning to send it but not having