Compare commits

...

4 Commits

Author SHA1 Message Date
Floris Bruynooghe
0c3613dbf4 Fixup from merge: make BackupProvider clone again 2023-03-29 17:25:41 +02:00
Floris Bruynooghe
19f8dc95dc Merge branch 'stable-1.112' into flub/clone-provider-in-wait 2023-03-29 17:23:16 +02:00
Floris Bruynooghe
bc0f65014e Add changelog 2023-03-29 13:04:30 +02:00
Floris Bruynooghe
6a34cb7ad8 fix(imex): Clone provider in dc_backup_provider_wait()
This is a long running process and there has been at lease one crash
in this function.  By owning both the context and the provider when
waiting we can avoid them being deallocated while we are still using
them.

To make the BackupProvider clonable this transforms all the errors
from it into Strings.  These are clonable and how we report most our
errors anyway.  The Future impl of BackupProvider then turns this into
an anyhow::Error so all other code can keep using anyhow as usual.
2023-03-29 12:54:26 +02:00
3 changed files with 26 additions and 9 deletions

View File

@@ -6,6 +6,7 @@
- Update iroh, remove `default-net` from `[patch.crates-io]` section.
- transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240
- Make sure BackupProvider is cancelled on drop (or dc_backup_provider_unref). The BackupProvider will now alaway finish with an IMEX event of 1000 or 0, previoulsy it would sometimes finishe with 1000 (success) when it really was 0 (failure). #4242
- Fix crash when dc_backup_provider_t is unrefed while dc_backup_provider_wait() is still using it. #4244
## [1.112.1] - 2023-03-27

View File

@@ -4221,10 +4221,17 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid
let ffi_provider = &mut *provider;
let ctx = &*ffi_provider.context;
let provider = &mut ffi_provider.provider;
backup_provider_wait(ctx.clone(), provider.clone());
}
// Because this is a long-running operation make sure we own the Context and BackupProvider.
// This stops a FFI user from deallocating it by calling unref on the object while we are
// using it.
fn backup_provider_wait(context: Context, provider: BackupProvider) {
block_on(provider)
.log_err(ctx, "Failed to await BackupProvider")
.log_err(&context, "Failed to await BackupProvider")
.context("Failed to await BackupProvider")
.set_last_error(ctx)
.set_last_error(&context)
.ok();
}

View File

@@ -27,10 +27,13 @@ use std::net::Ipv4Addr;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use futures_lite::StreamExt;
use iroh::blobs::Collection;
use iroh::get::DataStream;
@@ -42,7 +45,7 @@ use tokio::fs::{self, File};
use tokio::io::{self, AsyncWriteExt, BufWriter};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, Mutex};
use tokio::task::{JoinHandle, JoinSet};
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::sync::CancellationToken;
@@ -69,14 +72,14 @@ const MAX_CONCURRENT_DIALS: u8 = 16;
///
/// The task implements [`Future`] and awaiting it will complete once a transfer has been
/// either completed or aborted.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BackupProvider {
/// The supervisor task, run by [`BackupProvider::watch_provider`].
handle: JoinHandle<Result<()>>,
handle: Shared<BoxFuture<'static, Result<(), String>>>,
/// The ticket to retrieve the backup collection.
ticket: Ticket,
/// Guard to cancel the provider on drop.
_drop_guard: tokio_util::sync::DropGuard,
_drop_guard: Arc<tokio_util::sync::DropGuard>,
}
impl BackupProvider {
@@ -139,13 +142,17 @@ impl BackupProvider {
// Explicit drop to move the guards into this future
drop(paused_guard);
drop(dbfile);
res
res.map_err(|err| format!("{err:#}"))
})
.map_err(|err| format!("{err}"))
.and_then(futures::future::ready)
.boxed()
.shared()
};
Ok(Self {
handle,
ticket,
_drop_guard: drop_token.drop_guard(),
_drop_guard: Arc::new(drop_token.drop_guard()),
})
}
@@ -293,7 +300,9 @@ impl Future for BackupProvider {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)?
Pin::new(&mut self.handle)
.poll(cx)
.map_err(anyhow::Error::msg)
}
}