fix(imex): Clone provider in dc_backup_provider_wait()

This is a long running process and there has been at lease one crash
in this function.  By owning both the context and the provider when
waiting we can avoid them being deallocated while we are still using
them.

To make the BackupProvider clonable this transforms all the errors
from it into Strings.  These are clonable and how we report most our
errors anyway.  The Future impl of BackupProvider then turns this into
an anyhow::Error so all other code can keep using anyhow as usual.
This commit is contained in:
Floris Bruynooghe
2023-03-29 12:54:26 +02:00
parent fc25bba514
commit 6a34cb7ad8
2 changed files with 22 additions and 7 deletions

View File

@@ -4221,10 +4221,17 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid
let ffi_provider = &mut *provider;
let ctx = &*ffi_provider.context;
let provider = &mut ffi_provider.provider;
backup_provider_wait(ctx.clone(), provider.clone());
}
// Because this is a long-running operation make sure we own the Context and BackupProvider.
// This stops a FFI user from deallocating it by calling unref on the object while we are
// using it.
fn backup_provider_wait(context: Context, provider: BackupProvider) {
block_on(provider)
.log_err(ctx, "Failed to await BackupProvider")
.log_err(&context, "Failed to await BackupProvider")
.context("Failed to await BackupProvider")
.set_last_error(ctx)
.set_last_error(&context)
.ok();
}

View File

@@ -32,6 +32,8 @@ use std::task::Poll;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use futures_lite::StreamExt;
use iroh::get::{DataStream, Options};
use iroh::progress::ProgressEmitter;
@@ -42,7 +44,7 @@ use tokio::fs::{self, File};
use tokio::io::{self, AsyncWriteExt, BufWriter};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, Mutex};
use tokio::task::{JoinHandle, JoinSet};
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReadDirStream;
use crate::blob::BlobDirContents;
@@ -66,10 +68,10 @@ use super::{export_database, DBFILE_BACKUP_NAME};
///
/// The task implements [`Future`] and awaiting it will complete once a transfer has been
/// either completed or aborted.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BackupProvider {
/// The supervisor task, run by [`BackupProvider::watch_provider`].
handle: JoinHandle<Result<()>>,
handle: Shared<BoxFuture<'static, Result<(), String>>>,
/// The ticket to retrieve the backup collection.
ticket: Ticket,
}
@@ -132,8 +134,12 @@ impl BackupProvider {
// Explicit drop to move the guards into this future
drop(paused_guard);
drop(dbfile);
res
res.map_err(|err| format!("{err:#}"))
})
.map_err(|err| format!("{err}"))
.and_then(futures::future::ready)
.boxed()
.shared()
};
Ok(Self { handle, ticket })
}
@@ -278,7 +284,9 @@ impl Future for BackupProvider {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)?
Pin::new(&mut self.handle)
.poll(cx)
.map_err(anyhow::Error::msg)
}
}