From 943c8a1ab345aae7695ccbfdbe5799e29008270a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 29 Mar 2023 14:51:08 +0200 Subject: [PATCH] feat(imex): Cancel BackupProvider when dropped (#4242) This ensures that the BackupProvider will be stopped as soon as the struct is dropped and the imex progress error event is emitted. This makes it easier to use and also makes sure that the ffi call dc_backup_provider_unref() does not lead to dangling resources. --- CHANGELOG.md | 1 + Cargo.lock | 1 + Cargo.toml | 3 ++- deltachat-ffi/deltachat.h | 6 ++++++ src/imex/transfer.rs | 33 +++++++++++++++++++++++++++++---- 5 files changed, 39 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ecec5f9d..45af4ed94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Changes - Update iroh, remove `default-net` from `[patch.crates-io]` section. - transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240 +- Make sure BackupProvider is cancelled on drop (or dc_backup_provider_unref). The BackupProvider will now alaway finish with an IMEX event of 1000 or 0, previoulsy it would sometimes finishe with 1000 (success) when it really was 0 (failure). #4242 ## [1.112.1] - 2023-03-27 diff --git a/Cargo.lock b/Cargo.lock index 832ffd88c..ae6ef62cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1202,6 +1202,7 @@ dependencies = [ "tokio-io-timeout", "tokio-stream", "tokio-tar", + "tokio-util", "toml", "trust-dns-resolver", "url", diff --git a/Cargo.toml b/Cargo.toml index ba4ea5fc5..e153ecf55 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,10 +84,11 @@ strum_macros = "0.24" tagger = "4.3.4" textwrap = "0.16.0" thiserror = "1" +tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } tokio-io-timeout = "1.2.0" tokio-stream = { version = "0.1.11", features = ["fs"] } tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar -tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } +tokio-util = "0.7.7" toml = "0.7" trust-dns-resolver = "0.22" url = "2" diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 8cb694c4f..3203d865d 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2775,6 +2775,12 @@ void dc_backup_provider_wait (dc_backup_provider_t* backup_provider); /** * Frees a dc_backup_provider_t object. * + * If the provider has not yet finished, as indicated by + * dc_backup_provider_wait() or the #DC_EVENT_IMEX_PROGRESS event with value + * of 0 (failed) or 1000 (succeeded), this will also abort any in-progress + * transfer. If this aborts the provider a #DC_EVENT_IMEX_PROGRESS event with + * value 0 (failed) will be emitted. + * * @memberof dc_backup_provider_t * @param backup_provider The backup provider object as created by * dc_backup_provider_new(). diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 1837bcca8..441d5e531 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -44,6 +44,7 @@ use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, Mutex}; use tokio::task::{JoinHandle, JoinSet}; use tokio_stream::wrappers::ReadDirStream; +use tokio_util::sync::CancellationToken; use crate::blob::BlobDirContents; use crate::chat::delete_and_reset_all_device_msgs; @@ -74,6 +75,8 @@ pub struct BackupProvider { handle: JoinHandle>, /// The ticket to retrieve the backup collection. ticket: Ticket, + /// Guard to cancel the provider on drop. + _drop_guard: tokio_util::sync::DropGuard, } impl BackupProvider { @@ -125,10 +128,12 @@ impl BackupProvider { return Err(err); } }; + let drop_token = CancellationToken::new(); let handle = { let context = context.clone(); + let drop_token = drop_token.clone(); tokio::spawn(async move { - let res = Self::watch_provider(&context, provider, cancel_token).await; + let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await; context.free_ongoing().await; // Explicit drop to move the guards into this future @@ -137,7 +142,11 @@ impl BackupProvider { res }) }; - Ok(Self { handle, ticket }) + Ok(Self { + handle, + ticket, + _drop_guard: drop_token.drop_guard(), + }) } /// Creates the provider task. @@ -191,8 +200,8 @@ impl BackupProvider { context: &Context, mut provider: Provider, cancel_token: Receiver<()>, + drop_token: CancellationToken, ) -> Result<()> { - // _dbfile exists so we can clean up the file once it is no longer needed let mut events = provider.subscribe(); let mut total_size = 0; let mut current_size = 0; @@ -252,8 +261,12 @@ impl BackupProvider { }, _ = cancel_token.recv() => { provider.shutdown(); - break Err(anyhow!("BackupSender cancelled")); + break Err(anyhow!("BackupProvider cancelled")); }, + _ = drop_token.cancelled() => { + provider.shutdown(); + break Err(anyhow!("BackupProvider dropped")); + } } }; match &res { @@ -660,4 +673,16 @@ mod tests { assert_eq!(out, EventType::ImexProgress(progress)); } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_drop_provider() { + let mut tcm = TestContextManager::new(); + let ctx = tcm.alice().await; + + let provider = BackupProvider::prepare(&ctx).await.unwrap(); + drop(provider); + ctx.evtracker + .get_matching(|ev| matches!(ev, EventType::ImexProgress(0))) + .await; + } }