mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 15:02:11 +03:00
Compare commits
4 Commits
v1.135.1
...
flub/clone
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c3613dbf4 | ||
|
|
19f8dc95dc | ||
|
|
bc0f65014e | ||
|
|
6a34cb7ad8 |
@@ -6,6 +6,7 @@
|
||||
- Update iroh, remove `default-net` from `[patch.crates-io]` section.
|
||||
- transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240
|
||||
- Make sure BackupProvider is cancelled on drop (or dc_backup_provider_unref). The BackupProvider will now alaway finish with an IMEX event of 1000 or 0, previoulsy it would sometimes finishe with 1000 (success) when it really was 0 (failure). #4242
|
||||
- Fix crash when dc_backup_provider_t is unrefed while dc_backup_provider_wait() is still using it. #4244
|
||||
|
||||
## [1.112.1] - 2023-03-27
|
||||
|
||||
|
||||
@@ -4221,10 +4221,17 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid
|
||||
let ffi_provider = &mut *provider;
|
||||
let ctx = &*ffi_provider.context;
|
||||
let provider = &mut ffi_provider.provider;
|
||||
backup_provider_wait(ctx.clone(), provider.clone());
|
||||
}
|
||||
|
||||
// Because this is a long-running operation make sure we own the Context and BackupProvider.
|
||||
// This stops a FFI user from deallocating it by calling unref on the object while we are
|
||||
// using it.
|
||||
fn backup_provider_wait(context: Context, provider: BackupProvider) {
|
||||
block_on(provider)
|
||||
.log_err(ctx, "Failed to await BackupProvider")
|
||||
.log_err(&context, "Failed to await BackupProvider")
|
||||
.context("Failed to await BackupProvider")
|
||||
.set_last_error(ctx)
|
||||
.set_last_error(&context)
|
||||
.ok();
|
||||
}
|
||||
|
||||
|
||||
@@ -27,10 +27,13 @@ use std::net::Ipv4Addr;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use futures::future::{BoxFuture, Shared};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use futures_lite::StreamExt;
|
||||
use iroh::blobs::Collection;
|
||||
use iroh::get::DataStream;
|
||||
@@ -42,7 +45,7 @@ use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -69,14 +72,14 @@ const MAX_CONCURRENT_DIALS: u8 = 16;
|
||||
///
|
||||
/// The task implements [`Future`] and awaiting it will complete once a transfer has been
|
||||
/// either completed or aborted.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BackupProvider {
|
||||
/// The supervisor task, run by [`BackupProvider::watch_provider`].
|
||||
handle: JoinHandle<Result<()>>,
|
||||
handle: Shared<BoxFuture<'static, Result<(), String>>>,
|
||||
/// The ticket to retrieve the backup collection.
|
||||
ticket: Ticket,
|
||||
/// Guard to cancel the provider on drop.
|
||||
_drop_guard: tokio_util::sync::DropGuard,
|
||||
_drop_guard: Arc<tokio_util::sync::DropGuard>,
|
||||
}
|
||||
|
||||
impl BackupProvider {
|
||||
@@ -139,13 +142,17 @@ impl BackupProvider {
|
||||
// Explicit drop to move the guards into this future
|
||||
drop(paused_guard);
|
||||
drop(dbfile);
|
||||
res
|
||||
res.map_err(|err| format!("{err:#}"))
|
||||
})
|
||||
.map_err(|err| format!("{err}"))
|
||||
.and_then(futures::future::ready)
|
||||
.boxed()
|
||||
.shared()
|
||||
};
|
||||
Ok(Self {
|
||||
handle,
|
||||
ticket,
|
||||
_drop_guard: drop_token.drop_guard(),
|
||||
_drop_guard: Arc::new(drop_token.drop_guard()),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -293,7 +300,9 @@ impl Future for BackupProvider {
|
||||
type Output = Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut self.handle).poll(cx)?
|
||||
Pin::new(&mut self.handle)
|
||||
.poll(cx)
|
||||
.map_err(anyhow::Error::msg)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user