diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ecec5f9d..45af4ed94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Changes - Update iroh, remove `default-net` from `[patch.crates-io]` section. - transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240 +- Make sure BackupProvider is cancelled on drop (or dc_backup_provider_unref). The BackupProvider will now alaway finish with an IMEX event of 1000 or 0, previoulsy it would sometimes finishe with 1000 (success) when it really was 0 (failure). #4242 ## [1.112.1] - 2023-03-27 diff --git a/Cargo.lock b/Cargo.lock index 832ffd88c..ae6ef62cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1202,6 +1202,7 @@ dependencies = [ "tokio-io-timeout", "tokio-stream", "tokio-tar", + "tokio-util", "toml", "trust-dns-resolver", "url", diff --git a/Cargo.toml b/Cargo.toml index ba4ea5fc5..e153ecf55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,10 +84,11 @@ strum_macros = "0.24" tagger = "4.3.4" textwrap = "0.16.0" thiserror = "1" +tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } tokio-io-timeout = "1.2.0" tokio-stream = { version = "0.1.11", features = ["fs"] } tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar -tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } +tokio-util = "0.7.7" toml = "0.7" trust-dns-resolver = "0.22" url = "2" diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 8cb694c4f..3203d865d 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2775,6 +2775,12 @@ void dc_backup_provider_wait (dc_backup_provider_t* backup_provider); /** * Frees a dc_backup_provider_t object. * + * If the provider has not yet finished, as indicated by + * dc_backup_provider_wait() or the #DC_EVENT_IMEX_PROGRESS event with value + * of 0 (failed) or 1000 (succeeded), this will also abort any in-progress + * transfer. If this aborts the provider a #DC_EVENT_IMEX_PROGRESS event with + * value 0 (failed) will be emitted. + * * @memberof dc_backup_provider_t * @param backup_provider The backup provider object as created by * dc_backup_provider_new(). diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 1837bcca8..441d5e531 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -44,6 +44,7 @@ use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, Mutex}; use tokio::task::{JoinHandle, JoinSet}; use tokio_stream::wrappers::ReadDirStream; +use tokio_util::sync::CancellationToken; use crate::blob::BlobDirContents; use crate::chat::delete_and_reset_all_device_msgs; @@ -74,6 +75,8 @@ pub struct BackupProvider { handle: JoinHandle>, /// The ticket to retrieve the backup collection. ticket: Ticket, + /// Guard to cancel the provider on drop. + _drop_guard: tokio_util::sync::DropGuard, } impl BackupProvider { @@ -125,10 +128,12 @@ impl BackupProvider { return Err(err); } }; + let drop_token = CancellationToken::new(); let handle = { let context = context.clone(); + let drop_token = drop_token.clone(); tokio::spawn(async move { - let res = Self::watch_provider(&context, provider, cancel_token).await; + let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await; context.free_ongoing().await; // Explicit drop to move the guards into this future @@ -137,7 +142,11 @@ impl BackupProvider { res }) }; - Ok(Self { handle, ticket }) + Ok(Self { + handle, + ticket, + _drop_guard: drop_token.drop_guard(), + }) } /// Creates the provider task. @@ -191,8 +200,8 @@ impl BackupProvider { context: &Context, mut provider: Provider, cancel_token: Receiver<()>, + drop_token: CancellationToken, ) -> Result<()> { - // _dbfile exists so we can clean up the file once it is no longer needed let mut events = provider.subscribe(); let mut total_size = 0; let mut current_size = 0; @@ -252,8 +261,12 @@ impl BackupProvider { }, _ = cancel_token.recv() => { provider.shutdown(); - break Err(anyhow!("BackupSender cancelled")); + break Err(anyhow!("BackupProvider cancelled")); }, + _ = drop_token.cancelled() => { + provider.shutdown(); + break Err(anyhow!("BackupProvider dropped")); + } } }; match &res { @@ -660,4 +673,16 @@ mod tests { assert_eq!(out, EventType::ImexProgress(progress)); } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_drop_provider() { + let mut tcm = TestContextManager::new(); + let ctx = tcm.alice().await; + + let provider = BackupProvider::prepare(&ctx).await.unwrap(); + drop(provider); + ctx.evtracker + .get_matching(|ev| matches!(ev, EventType::ImexProgress(0))) + .await; + } }