mirror of
https://github.com/chatmail/core.git
synced 2026-04-21 15:36:30 +03:00
feat(imex): Cancel BackupProvider when dropped (#4242)
This ensures that the BackupProvider will be stopped as soon as the struct is dropped and the imex progress error event is emitted. This makes it easier to use and also makes sure that the ffi call dc_backup_provider_unref() does not lead to dangling resources.
This commit is contained in:
committed by
GitHub
parent
5be558ea68
commit
943c8a1ab3
@@ -44,6 +44,7 @@ use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::blob::BlobDirContents;
|
||||
use crate::chat::delete_and_reset_all_device_msgs;
|
||||
@@ -74,6 +75,8 @@ pub struct BackupProvider {
|
||||
handle: JoinHandle<Result<()>>,
|
||||
/// The ticket to retrieve the backup collection.
|
||||
ticket: Ticket,
|
||||
/// Guard to cancel the provider on drop.
|
||||
_drop_guard: tokio_util::sync::DropGuard,
|
||||
}
|
||||
|
||||
impl BackupProvider {
|
||||
@@ -125,10 +128,12 @@ impl BackupProvider {
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
let drop_token = CancellationToken::new();
|
||||
let handle = {
|
||||
let context = context.clone();
|
||||
let drop_token = drop_token.clone();
|
||||
tokio::spawn(async move {
|
||||
let res = Self::watch_provider(&context, provider, cancel_token).await;
|
||||
let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await;
|
||||
context.free_ongoing().await;
|
||||
|
||||
// Explicit drop to move the guards into this future
|
||||
@@ -137,7 +142,11 @@ impl BackupProvider {
|
||||
res
|
||||
})
|
||||
};
|
||||
Ok(Self { handle, ticket })
|
||||
Ok(Self {
|
||||
handle,
|
||||
ticket,
|
||||
_drop_guard: drop_token.drop_guard(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates the provider task.
|
||||
@@ -191,8 +200,8 @@ impl BackupProvider {
|
||||
context: &Context,
|
||||
mut provider: Provider,
|
||||
cancel_token: Receiver<()>,
|
||||
drop_token: CancellationToken,
|
||||
) -> Result<()> {
|
||||
// _dbfile exists so we can clean up the file once it is no longer needed
|
||||
let mut events = provider.subscribe();
|
||||
let mut total_size = 0;
|
||||
let mut current_size = 0;
|
||||
@@ -252,8 +261,12 @@ impl BackupProvider {
|
||||
},
|
||||
_ = cancel_token.recv() => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupSender cancelled"));
|
||||
break Err(anyhow!("BackupProvider cancelled"));
|
||||
},
|
||||
_ = drop_token.cancelled() => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider dropped"));
|
||||
}
|
||||
}
|
||||
};
|
||||
match &res {
|
||||
@@ -660,4 +673,16 @@ mod tests {
|
||||
assert_eq!(out, EventType::ImexProgress(progress));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_drop_provider() {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let ctx = tcm.alice().await;
|
||||
|
||||
let provider = BackupProvider::prepare(&ctx).await.unwrap();
|
||||
drop(provider);
|
||||
ctx.evtracker
|
||||
.get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user