diff --git a/CHANGELOG.md b/CHANGELOG.md index 3740bb21d..63e201000 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### Changes - Update iroh, remove `default-net` from `[patch.crates-io]` section. +- transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240 +- Make sure BackupProvider is cancelled on drop (or dc_backup_provider_unref). The BackupProvider will now alaway finish with an IMEX event of 1000 or 0, previoulsy it would sometimes finishe with 1000 (success) when it really was 0 (failure). #4242 - Fix crash when dc_backup_provider_t is unrefed while dc_backup_provider_wait() is still using it. #4244 ## [1.112.1] - 2023-03-27 diff --git a/Cargo.lock b/Cargo.lock index 6c74382da..38e9ed35f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -368,12 +368,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base-x" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" - [[package]] name = "base16ct" version = "0.1.1" @@ -662,39 +656,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "bitflags", - "clap_lex 0.2.4", + "clap_lex", "indexmap", "textwrap", ] -[[package]] -name = "clap" -version = "4.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d7ae14b20b94cb02149ed21a86c423859cbe18dc7ed69845cace50e52b40a5" -dependencies = [ - "bitflags", - "clap_derive", - "clap_lex 0.3.2", - "is-terminal", - "once_cell", - "strsim", - "termcolor", -] - -[[package]] -name = "clap_derive" -version = "4.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44bec8e5c9d09e439c4335b1af0abaab56dcf3b94999a936e1bb47b9134288f0" -dependencies = [ - "heck", - "proc-macro-error", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "clap_lex" version = "0.2.4" @@ -704,15 +670,6 @@ dependencies = [ "os_str_bytes", ] -[[package]] -name = "clap_lex" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350b9cf31731f9957399229e9b2adc51eeabdfbe9d71d9a0552275fd12710d09" -dependencies = [ - "os_str_bytes", -] - [[package]] name = "clipboard-win" version = "4.5.0" @@ -755,19 +712,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "console" -version = "0.15.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d79fbe8970a77e3e34151cc13d3b3e248aa0faaecb9f6091fa07ebefe5ad60" -dependencies = [ - "encode_unicode", - "lazy_static", - "libc", - "unicode-width", - "windows-sys 0.42.0", -] - [[package]] name = "const-oid" version = "0.9.2" @@ -862,7 +806,7 @@ dependencies = [ "atty", "cast", "ciborium", - "clap 3.2.23", + "clap", "criterion-plot", "futures", "itertools", @@ -1098,26 +1042,6 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb" -[[package]] -name = "data-encoding-macro" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86927b7cd2fe88fa698b87404b287ab98d1a0063a34071d92e575b72d3029aca" -dependencies = [ - "data-encoding", - "data-encoding-macro-internal", -] - -[[package]] -name = "data-encoding-macro-internal" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5bbed42daaa95e780b60a50546aa345b8413a1e46f9a40a12907d3598f038db" -dependencies = [ - "data-encoding", - "syn", -] - [[package]] name = "default-net" version = "0.14.1" @@ -1202,6 +1126,7 @@ dependencies = [ "tokio-io-timeout", "tokio-stream", "tokio-tar", + "tokio-util", "toml", "trust-dns-resolver", "url", @@ -1563,12 +1488,6 @@ dependencies = [ "version_check 0.9.4", ] -[[package]] -name = "encode_unicode" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" - [[package]] name = "encoded-words" version = "0.2.0" @@ -2358,19 +2277,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "indicatif" -version = "0.17.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cef509aa9bc73864d6756f0d34d35504af3cf0844373afe9b8669a5b8005a729" -dependencies = [ - "console", - "number_prefix", - "portable-atomic 0.3.19", - "tokio", - "unicode-width", -] - [[package]] name = "inout" version = "0.1.3" @@ -2419,17 +2325,15 @@ checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" [[package]] name = "iroh" -version = "0.3.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#59babe14aa481e90dd09d16bd91fa9b4e12c9c54" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c019223f5af15f978ff44ae02b8b83d21d53df4c42d4475aa80670819c3ecdce" dependencies = [ "abao", "anyhow", "base64 0.21.0", "blake3", "bytes", - "clap 4.1.8", - "console", - "data-encoding", "default-net", "der", "derive_more", @@ -2437,10 +2341,8 @@ dependencies = [ "ed25519-dalek", "futures", "hex", - "indicatif", - "multibase", "num_cpus", - "portable-atomic 1.0.1", + "portable-atomic", "postcard", "quic-rpc", "quinn", @@ -2732,17 +2634,6 @@ dependencies = [ "windows-sys 0.45.0", ] -[[package]] -name = "multibase" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" -dependencies = [ - "base-x", - "data-encoding", - "data-encoding-macro", -] - [[package]] name = "mutate_once" version = "0.1.1" @@ -2977,12 +2868,6 @@ dependencies = [ "libc", ] -[[package]] -name = "number_prefix" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" - [[package]] name = "object" version = "0.30.3" @@ -3346,12 +3231,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "portable-atomic" -version = "0.3.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" - [[package]] name = "portable-atomic" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index ba4ea5fc5..f7b367abe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,8 +52,7 @@ futures-lite = "1.12.0" hex = "0.4.0" humansize = "2" image = { version = "0.24.5", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] } -# iroh = { version = "0.3.0", default-features = false } -iroh = { git = 'https://github.com/n0-computer/iroh', branch = "main" } +iroh = { version = "0.4.0", default-features = false } kamadak-exif = "0.5" lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } libc = "0.2" @@ -84,10 +83,11 @@ strum_macros = "0.24" tagger = "4.3.4" textwrap = "0.16.0" thiserror = "1" +tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } tokio-io-timeout = "1.2.0" tokio-stream = { version = "0.1.11", features = ["fs"] } tokio-tar = { version = "0.3" } # TODO: integrate tokio into async-tar -tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } +tokio-util = "0.7.7" toml = "0.7" trust-dns-resolver = "0.22" url = "2" diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 8cb694c4f..3203d865d 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -2775,6 +2775,12 @@ void dc_backup_provider_wait (dc_backup_provider_t* backup_provider); /** * Frees a dc_backup_provider_t object. * + * If the provider has not yet finished, as indicated by + * dc_backup_provider_wait() or the #DC_EVENT_IMEX_PROGRESS event with value + * of 0 (failed) or 1000 (succeeded), this will also abort any in-progress + * transfer. If this aborts the provider a #DC_EVENT_IMEX_PROGRESS event with + * value 0 (failed) will be emitted. + * * @memberof dc_backup_provider_t * @param backup_provider The backup provider object as created by * dc_backup_provider_new(). diff --git a/deny.toml b/deny.toml index a631480de..d5f70ed62 100644 --- a/deny.toml +++ b/deny.toml @@ -28,7 +28,6 @@ skip = [ { name = "humantime", version = "<2.1" }, { name = "idna", version = "<0.3" }, { name = "nom", version = "<7.1" }, - { name = "portable-atomic", version = "<1.0" }, { name = "quick-error", version = "<2.0" }, { name = "rand", version = "<0.8" }, { name = "rand_chacha", version = "<0.3" }, @@ -75,6 +74,5 @@ license-files = [ github = [ "async-email", "deltachat", - "n0-computer", "quinn-rs", ] diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 3baf6d1f4..b95755146 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -22,7 +22,6 @@ //! getter can not connect to an impersonated provider and the provider does not offer the //! download to an impersonated getter. -use std::cmp::Ordering; use std::future::Future; use std::net::Ipv4Addr; use std::ops::Deref; @@ -35,7 +34,8 @@ use async_channel::Receiver; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, TryFutureExt}; use futures_lite::StreamExt; -use iroh::get::{DataStream, Options}; +use iroh::blobs::Collection; +use iroh::get::DataStream; use iroh::progress::ProgressEmitter; use iroh::protocol::AuthToken; use iroh::provider::{DataSource, Event, Provider, Ticket}; @@ -46,6 +46,7 @@ use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, Mutex}; use tokio::task::JoinSet; use tokio_stream::wrappers::ReadDirStream; +use tokio_util::sync::CancellationToken; use crate::blob::BlobDirContents; use crate::chat::delete_and_reset_all_device_msgs; @@ -55,6 +56,8 @@ use crate::{e2ee, EventType}; use super::{export_database, DBFILE_BACKUP_NAME}; +const MAX_CONCURRENT_DIALS: u8 = 16; + /// Provide or send a backup of this device. /// /// This creates a backup of the current device and starts a service which offers another @@ -74,6 +77,8 @@ pub struct BackupProvider { handle: Shared>>, /// The ticket to retrieve the backup collection. ticket: Ticket, + /// Guard to cancel the provider on drop. + _drop_guard: tokio_util::sync::DropGuard, } impl BackupProvider { @@ -125,10 +130,12 @@ impl BackupProvider { return Err(err); } }; + let drop_token = CancellationToken::new(); let handle = { let context = context.clone(); + let drop_token = drop_token.clone(); tokio::spawn(async move { - let res = Self::watch_provider(&context, provider, cancel_token).await; + let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await; context.free_ongoing().await; // Explicit drop to move the guards into this future @@ -141,7 +148,11 @@ impl BackupProvider { .boxed() .shared() }; - Ok(Self { handle, ticket }) + Ok(Self { + handle, + ticket, + _drop_guard: drop_token.drop_guard(), + }) } /// Creates the provider task. @@ -177,7 +188,7 @@ impl BackupProvider { .spawn()?; context.emit_event(SendProgress::ProviderListening.into()); info!(context, "Waiting for remote to connect"); - let ticket = provider.ticket(hash); + let ticket = provider.ticket(hash)?; Ok((provider, ticket)) } @@ -195,8 +206,8 @@ impl BackupProvider { context: &Context, mut provider: Provider, cancel_token: Receiver<()>, + drop_token: CancellationToken, ) -> Result<()> { - // _dbfile exists so we can clean up the file once it is no longer needed let mut events = provider.subscribe(); let mut total_size = 0; let mut current_size = 0; @@ -256,8 +267,12 @@ impl BackupProvider { }, _ = cancel_token.recv() => { provider.shutdown(); - break Err(anyhow!("BackupSender cancelled")); + break Err(anyhow!("BackupProvider cancelled")); }, + _ = drop_token.cancelled() => { + provider.shutdown(); + break Err(anyhow!("BackupProvider dropped")); + } } }; match &res { @@ -395,122 +410,71 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { } async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> { - let mut ticket = match qr { + let ticket = match qr { Qr::Backup { ticket } => ticket, _ => bail!("QR code for backup must be of type DCBACKUP"), }; - if ticket.addrs.is_empty() { - bail!("ticket is missing addresses to dial"); - } - // Crude sorting, most local wifi's are in the 192.168.0.0/24 range so this will try - // them first. - ticket.addrs.sort_by(|a, b| { - let a = a.to_string(); - let b = b.to_string(); - if a.starts_with("192.168.") && !b.starts_with("192.168.") { - Ordering::Less - } else if b.starts_with("192.168.") && !a.starts_with("192.168.") { - Ordering::Greater - } else { - Ordering::Equal + match transfer_from_provider(context, &ticket).await { + Ok(()) => { + delete_and_reset_all_device_msgs(context).await?; + context.emit_event(ReceiveProgress::Completed.into()); + Ok(()) } - }); - for addr in &ticket.addrs { - let opts = Options { - addr: *addr, - peer_id: Some(ticket.peer), - keylog: false, - }; - info!(context, "attempting to contact {}", addr); - match transfer_from_provider(context, &ticket, opts).await { - Ok(_) => { - delete_and_reset_all_device_msgs(context).await?; - context.emit_event(ReceiveProgress::Completed.into()); - return Ok(()); - } - Err(TransferError::ConnectionError(err)) => { - warn!(context, "Connection error: {err:#}."); - continue; - } - Err(TransferError::Other(err)) => { - // Clean up any blobs we already wrote. - let readdir = fs::read_dir(context.get_blobdir()).await?; - let mut readdir = ReadDirStream::new(readdir); - while let Some(dirent) = readdir.next().await { - if let Ok(dirent) = dirent { - fs::remove_file(dirent.path()).await.ok(); - } + Err(err) => { + // Clean up any blobs we already wrote. + let readdir = fs::read_dir(context.get_blobdir()).await?; + let mut readdir = ReadDirStream::new(readdir); + while let Some(dirent) = readdir.next().await { + if let Ok(dirent) = dirent { + fs::remove_file(dirent.path()).await.ok(); } - context.emit_event(ReceiveProgress::Failed.into()); - return Err(err); } + context.emit_event(ReceiveProgress::Failed.into()); + Err(err) } } - Err(anyhow!("failed to contact provider")) } -/// Error during a single transfer attempt. -/// -/// Mostly exists to distinguish between `ConnectionError` and any other errors. -#[derive(Debug, thiserror::Error)] -enum TransferError { - #[error("connection error")] - ConnectionError(#[source] anyhow::Error), - #[error("other")] - Other(#[source] anyhow::Error), -} - -async fn transfer_from_provider( - context: &Context, - ticket: &Ticket, - opts: Options, -) -> Result<(), TransferError> { +async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> { let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress()); spawn_progress_proxy(context.clone(), progress.subscribe()); - let mut connected = false; let on_connected = || { context.emit_event(ReceiveProgress::Connected.into()); - connected = true; + async { Ok(()) } + }; + let on_collection = |collection: &Collection| { + context.emit_event(ReceiveProgress::CollectionReceived.into()); + progress.set_total(collection.total_blobs_size()); async { Ok(()) } }; let jobs = Mutex::new(JoinSet::default()); let on_blob = |hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name); - let res = iroh::get::run( - ticket.hash, - ticket.token, - opts, + + // Perform the transfer. + let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality + let stats = iroh::get::run_ticket( + ticket, + keylog, + MAX_CONCURRENT_DIALS, on_connected, - |collection| { - context.emit_event(ReceiveProgress::CollectionReceived.into()); - progress.set_total(collection.total_blobs_size()); - async { Ok(()) } - }, + on_collection, on_blob, ) - .await; + .await?; let mut jobs = jobs.lock().await; while let Some(job) = jobs.join_next().await { - job.context("job failed").map_err(TransferError::Other)?; + job.context("job failed")?; } - drop(progress); - match res { - Ok(stats) => { - info!( - context, - "Backup transfer finished, transfer rate is {} Mbps.", - stats.mbits() - ); - Ok(()) - } - Err(err) => match connected { - true => Err(TransferError::Other(err)), - false => Err(TransferError::ConnectionError(err)), - }, - } + info!( + context, + "Backup transfer finished, transfer rate was {} Mbps.", + stats.mbits() + ); + Ok(()) } /// Get callback when a blob is received from the provider. @@ -552,7 +516,7 @@ async fn on_blob( if name.starts_with("db/") { let context = context.clone(); - let token = ticket.token.to_string(); + let token = ticket.token().to_string(); jobs.lock().await.spawn(async move { if let Err(err) = context.sql.import(&path, token).await { error!(context, "cannot import database: {:#?}", err); @@ -714,4 +678,16 @@ mod tests { assert_eq!(out, EventType::ImexProgress(progress)); } } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_drop_provider() { + let mut tcm = TestContextManager::new(); + let ctx = tcm.alice().await; + + let provider = BackupProvider::prepare(&ctx).await.unwrap(); + drop(provider); + ctx.evtracker + .get_matching(|ev| matches!(ev, EventType::ImexProgress(0))) + .await; + } }