From 5be558ea68d90b8e961d23e042e80ae1d0a6e0b4 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 29 Mar 2023 09:47:00 +0200 Subject: [PATCH] feat(imex) Connect to all provider addresses concurrently (#4240) This uses the new iroh API to connect to all provider addresses concurrently. It simplifies the implementation as well as we no longer need to try the addresses manually. --- CHANGELOG.md | 2 +- Cargo.lock | 2 +- src/imex/transfer.rs | 128 ++++++++++++++----------------------------- 3 files changed, 43 insertions(+), 89 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df61dbf04..1ecec5f9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### Changes - Update iroh, remove `default-net` from `[patch.crates-io]` section. - +- transfer backup: Connect to mutliple provider addresses concurrently. This should speed up connection time significantly on the getter side. #4240 ## [1.112.1] - 2023-03-27 diff --git a/Cargo.lock b/Cargo.lock index 6c74382da..832ffd88c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2420,7 +2420,7 @@ checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146" [[package]] name = "iroh" version = "0.3.0" -source = "git+https://github.com/n0-computer/iroh?branch=main#59babe14aa481e90dd09d16bd91fa9b4e12c9c54" +source = "git+https://github.com/n0-computer/iroh?branch=main#91c7e2aee1f7f4059f3d391725fb49af4410a3eb" dependencies = [ "abao", "anyhow", diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index d93cf4f6d..1837bcca8 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -22,7 +22,6 @@ //! getter can not connect to an impersonated provider and the provider does not offer the //! download to an impersonated getter. -use std::cmp::Ordering; use std::future::Future; use std::net::Ipv4Addr; use std::ops::Deref; @@ -33,7 +32,8 @@ use std::task::Poll; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; use async_channel::Receiver; use futures_lite::StreamExt; -use iroh::get::{DataStream, Options}; +use iroh::blobs::Collection; +use iroh::get::DataStream; use iroh::progress::ProgressEmitter; use iroh::protocol::AuthToken; use iroh::provider::{DataSource, Event, Provider, Ticket}; @@ -53,6 +53,8 @@ use crate::{e2ee, EventType}; use super::{export_database, DBFILE_BACKUP_NAME}; +const MAX_CONCURRENT_DIALS: u8 = 16; + /// Provide or send a backup of this device. /// /// This creates a backup of the current device and starts a service which offers another @@ -387,7 +389,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { } async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> { - let mut ticket = match qr { + let ticket = match qr { Qr::Backup { ticket } => ticket, _ => bail!("QR code for backup must be of type DCBACKUP"), }; @@ -395,114 +397,66 @@ async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> { bail!("ticket is missing addresses to dial"); } - // Crude sorting, most local wifi's are in the 192.168.0.0/24 range so this will try - // them first. - ticket.addrs.sort_by(|a, b| { - let a = a.to_string(); - let b = b.to_string(); - if a.starts_with("192.168.") && !b.starts_with("192.168.") { - Ordering::Less - } else if b.starts_with("192.168.") && !a.starts_with("192.168.") { - Ordering::Greater - } else { - Ordering::Equal + match transfer_from_provider(context, &ticket).await { + Ok(()) => { + delete_and_reset_all_device_msgs(context).await?; + context.emit_event(ReceiveProgress::Completed.into()); + Ok(()) } - }); - for addr in &ticket.addrs { - let opts = Options { - addr: *addr, - peer_id: Some(ticket.peer), - keylog: false, - }; - info!(context, "attempting to contact {}", addr); - match transfer_from_provider(context, &ticket, opts).await { - Ok(_) => { - delete_and_reset_all_device_msgs(context).await?; - context.emit_event(ReceiveProgress::Completed.into()); - return Ok(()); - } - Err(TransferError::ConnectionError(err)) => { - warn!(context, "Connection error: {err:#}."); - continue; - } - Err(TransferError::Other(err)) => { - // Clean up any blobs we already wrote. - let readdir = fs::read_dir(context.get_blobdir()).await?; - let mut readdir = ReadDirStream::new(readdir); - while let Some(dirent) = readdir.next().await { - if let Ok(dirent) = dirent { - fs::remove_file(dirent.path()).await.ok(); - } + Err(err) => { + // Clean up any blobs we already wrote. + let readdir = fs::read_dir(context.get_blobdir()).await?; + let mut readdir = ReadDirStream::new(readdir); + while let Some(dirent) = readdir.next().await { + if let Ok(dirent) = dirent { + fs::remove_file(dirent.path()).await.ok(); } - context.emit_event(ReceiveProgress::Failed.into()); - return Err(err); } + context.emit_event(ReceiveProgress::Failed.into()); + Err(err) } } - Err(anyhow!("failed to contact provider")) } -/// Error during a single transfer attempt. -/// -/// Mostly exists to distinguish between `ConnectionError` and any other errors. -#[derive(Debug, thiserror::Error)] -enum TransferError { - #[error("connection error")] - ConnectionError(#[source] anyhow::Error), - #[error("other")] - Other(#[source] anyhow::Error), -} - -async fn transfer_from_provider( - context: &Context, - ticket: &Ticket, - opts: Options, -) -> Result<(), TransferError> { +async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> { let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress()); spawn_progress_proxy(context.clone(), progress.subscribe()); - let mut connected = false; let on_connected = || { context.emit_event(ReceiveProgress::Connected.into()); - connected = true; + async { Ok(()) } + }; + let on_collection = |collection: &Collection| { + context.emit_event(ReceiveProgress::CollectionReceived.into()); + progress.set_total(collection.total_blobs_size()); async { Ok(()) } }; let jobs = Mutex::new(JoinSet::default()); let on_blob = |hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name); - let res = iroh::get::run( - ticket.hash, - ticket.token, - opts, + + // Perform the transfer. + let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality + let stats = iroh::get::run_ticket( + ticket, + keylog, + MAX_CONCURRENT_DIALS, on_connected, - |collection| { - context.emit_event(ReceiveProgress::CollectionReceived.into()); - progress.set_total(collection.total_blobs_size()); - async { Ok(()) } - }, + on_collection, on_blob, ) - .await; + .await?; let mut jobs = jobs.lock().await; while let Some(job) = jobs.join_next().await { - job.context("job failed").map_err(TransferError::Other)?; + job.context("job failed")?; } - drop(progress); - match res { - Ok(stats) => { - info!( - context, - "Backup transfer finished, transfer rate is {} Mbps.", - stats.mbits() - ); - Ok(()) - } - Err(err) => match connected { - true => Err(TransferError::Other(err)), - false => Err(TransferError::ConnectionError(err)), - }, - } + info!( + context, + "Backup transfer finished, transfer rate was {} Mbps.", + stats.mbits() + ); + Ok(()) } /// Get callback when a blob is received from the provider.