From 6a34cb7ad890b9450f494ff30b0f8fe870e1bbb1 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 29 Mar 2023 12:54:26 +0200 Subject: [PATCH] fix(imex): Clone provider in dc_backup_provider_wait() This is a long running process and there has been at lease one crash in this function. By owning both the context and the provider when waiting we can avoid them being deallocated while we are still using them. To make the BackupProvider clonable this transforms all the errors from it into Strings. These are clonable and how we report most our errors anyway. The Future impl of BackupProvider then turns this into an anyhow::Error so all other code can keep using anyhow as usual. --- deltachat-ffi/src/lib.rs | 11 +++++++++-- src/imex/transfer.rs | 18 +++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 968616a5d..34be295a2 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4221,10 +4221,17 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid let ffi_provider = &mut *provider; let ctx = &*ffi_provider.context; let provider = &mut ffi_provider.provider; + backup_provider_wait(ctx.clone(), provider.clone()); +} + +// Because this is a long-running operation make sure we own the Context and BackupProvider. +// This stops a FFI user from deallocating it by calling unref on the object while we are +// using it. +fn backup_provider_wait(context: Context, provider: BackupProvider) { block_on(provider) - .log_err(ctx, "Failed to await BackupProvider") + .log_err(&context, "Failed to await BackupProvider") .context("Failed to await BackupProvider") - .set_last_error(ctx) + .set_last_error(&context) .ok(); } diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index d93cf4f6d..3baf6d1f4 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -32,6 +32,8 @@ use std::task::Poll; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; use async_channel::Receiver; +use futures::future::{BoxFuture, Shared}; +use futures::{FutureExt, TryFutureExt}; use futures_lite::StreamExt; use iroh::get::{DataStream, Options}; use iroh::progress::ProgressEmitter; @@ -42,7 +44,7 @@ use tokio::fs::{self, File}; use tokio::io::{self, AsyncWriteExt, BufWriter}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, Mutex}; -use tokio::task::{JoinHandle, JoinSet}; +use tokio::task::JoinSet; use tokio_stream::wrappers::ReadDirStream; use crate::blob::BlobDirContents; @@ -66,10 +68,10 @@ use super::{export_database, DBFILE_BACKUP_NAME}; /// /// The task implements [`Future`] and awaiting it will complete once a transfer has been /// either completed or aborted. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BackupProvider { /// The supervisor task, run by [`BackupProvider::watch_provider`]. - handle: JoinHandle>, + handle: Shared>>, /// The ticket to retrieve the backup collection. ticket: Ticket, } @@ -132,8 +134,12 @@ impl BackupProvider { // Explicit drop to move the guards into this future drop(paused_guard); drop(dbfile); - res + res.map_err(|err| format!("{err:#}")) }) + .map_err(|err| format!("{err}")) + .and_then(futures::future::ready) + .boxed() + .shared() }; Ok(Self { handle, ticket }) } @@ -278,7 +284,9 @@ impl Future for BackupProvider { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - Pin::new(&mut self.handle).poll(cx)? + Pin::new(&mut self.handle) + .poll(cx) + .map_err(anyhow::Error::msg) } }