diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 968616a5d..34be295a2 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4221,10 +4221,17 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid let ffi_provider = &mut *provider; let ctx = &*ffi_provider.context; let provider = &mut ffi_provider.provider; + backup_provider_wait(ctx.clone(), provider.clone()); +} + +// Because this is a long-running operation make sure we own the Context and BackupProvider. +// This stops a FFI user from deallocating it by calling unref on the object while we are +// using it. +fn backup_provider_wait(context: Context, provider: BackupProvider) { block_on(provider) - .log_err(ctx, "Failed to await BackupProvider") + .log_err(&context, "Failed to await BackupProvider") .context("Failed to await BackupProvider") - .set_last_error(ctx) + .set_last_error(&context) .ok(); } diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index d93cf4f6d..3baf6d1f4 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -32,6 +32,8 @@ use std::task::Poll; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; use async_channel::Receiver; +use futures::future::{BoxFuture, Shared}; +use futures::{FutureExt, TryFutureExt}; use futures_lite::StreamExt; use iroh::get::{DataStream, Options}; use iroh::progress::ProgressEmitter; @@ -42,7 +44,7 @@ use tokio::fs::{self, File}; use tokio::io::{self, AsyncWriteExt, BufWriter}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, Mutex}; -use tokio::task::{JoinHandle, JoinSet}; +use tokio::task::JoinSet; use tokio_stream::wrappers::ReadDirStream; use crate::blob::BlobDirContents; @@ -66,10 +68,10 @@ use super::{export_database, DBFILE_BACKUP_NAME}; /// /// The task implements [`Future`] and awaiting it will complete once a transfer has been /// either completed or aborted. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BackupProvider { /// The supervisor task, run by [`BackupProvider::watch_provider`]. - handle: JoinHandle>, + handle: Shared>>, /// The ticket to retrieve the backup collection. ticket: Ticket, } @@ -132,8 +134,12 @@ impl BackupProvider { // Explicit drop to move the guards into this future drop(paused_guard); drop(dbfile); - res + res.map_err(|err| format!("{err:#}")) }) + .map_err(|err| format!("{err}")) + .and_then(futures::future::ready) + .boxed() + .shared() }; Ok(Self { handle, ticket }) } @@ -278,7 +284,9 @@ impl Future for BackupProvider { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - Pin::new(&mut self.handle).poll(cx)? + Pin::new(&mut self.handle) + .poll(cx) + .map_err(anyhow::Error::msg) } }