Merge branch 'stable-1.112' into flub/clone-provider-in-wait

This commit is contained in:
Floris Bruynooghe
2023-03-29 17:23:16 +02:00
6 changed files with 90 additions and 229 deletions

View File

@@ -22,7 +22,6 @@
//! getter can not connect to an impersonated provider and the provider does not offer the
//! download to an impersonated getter.
use std::cmp::Ordering;
use std::future::Future;
use std::net::Ipv4Addr;
use std::ops::Deref;
@@ -35,7 +34,8 @@ use async_channel::Receiver;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, TryFutureExt};
use futures_lite::StreamExt;
use iroh::get::{DataStream, Options};
use iroh::blobs::Collection;
use iroh::get::DataStream;
use iroh::progress::ProgressEmitter;
use iroh::protocol::AuthToken;
use iroh::provider::{DataSource, Event, Provider, Ticket};
@@ -46,6 +46,7 @@ use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinSet;
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::sync::CancellationToken;
use crate::blob::BlobDirContents;
use crate::chat::delete_and_reset_all_device_msgs;
@@ -55,6 +56,8 @@ use crate::{e2ee, EventType};
use super::{export_database, DBFILE_BACKUP_NAME};
const MAX_CONCURRENT_DIALS: u8 = 16;
/// Provide or send a backup of this device.
///
/// This creates a backup of the current device and starts a service which offers another
@@ -74,6 +77,8 @@ pub struct BackupProvider {
handle: Shared<BoxFuture<'static, Result<(), String>>>,
/// The ticket to retrieve the backup collection.
ticket: Ticket,
/// Guard to cancel the provider on drop.
_drop_guard: tokio_util::sync::DropGuard,
}
impl BackupProvider {
@@ -125,10 +130,12 @@ impl BackupProvider {
return Err(err);
}
};
let drop_token = CancellationToken::new();
let handle = {
let context = context.clone();
let drop_token = drop_token.clone();
tokio::spawn(async move {
let res = Self::watch_provider(&context, provider, cancel_token).await;
let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await;
context.free_ongoing().await;
// Explicit drop to move the guards into this future
@@ -141,7 +148,11 @@ impl BackupProvider {
.boxed()
.shared()
};
Ok(Self { handle, ticket })
Ok(Self {
handle,
ticket,
_drop_guard: drop_token.drop_guard(),
})
}
/// Creates the provider task.
@@ -177,7 +188,7 @@ impl BackupProvider {
.spawn()?;
context.emit_event(SendProgress::ProviderListening.into());
info!(context, "Waiting for remote to connect");
let ticket = provider.ticket(hash);
let ticket = provider.ticket(hash)?;
Ok((provider, ticket))
}
@@ -195,8 +206,8 @@ impl BackupProvider {
context: &Context,
mut provider: Provider,
cancel_token: Receiver<()>,
drop_token: CancellationToken,
) -> Result<()> {
// _dbfile exists so we can clean up the file once it is no longer needed
let mut events = provider.subscribe();
let mut total_size = 0;
let mut current_size = 0;
@@ -256,8 +267,12 @@ impl BackupProvider {
},
_ = cancel_token.recv() => {
provider.shutdown();
break Err(anyhow!("BackupSender cancelled"));
break Err(anyhow!("BackupProvider cancelled"));
},
_ = drop_token.cancelled() => {
provider.shutdown();
break Err(anyhow!("BackupProvider dropped"));
}
}
};
match &res {
@@ -395,122 +410,71 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
}
async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
let mut ticket = match qr {
let ticket = match qr {
Qr::Backup { ticket } => ticket,
_ => bail!("QR code for backup must be of type DCBACKUP"),
};
if ticket.addrs.is_empty() {
bail!("ticket is missing addresses to dial");
}
// Crude sorting, most local wifi's are in the 192.168.0.0/24 range so this will try
// them first.
ticket.addrs.sort_by(|a, b| {
let a = a.to_string();
let b = b.to_string();
if a.starts_with("192.168.") && !b.starts_with("192.168.") {
Ordering::Less
} else if b.starts_with("192.168.") && !a.starts_with("192.168.") {
Ordering::Greater
} else {
Ordering::Equal
match transfer_from_provider(context, &ticket).await {
Ok(()) => {
delete_and_reset_all_device_msgs(context).await?;
context.emit_event(ReceiveProgress::Completed.into());
Ok(())
}
});
for addr in &ticket.addrs {
let opts = Options {
addr: *addr,
peer_id: Some(ticket.peer),
keylog: false,
};
info!(context, "attempting to contact {}", addr);
match transfer_from_provider(context, &ticket, opts).await {
Ok(_) => {
delete_and_reset_all_device_msgs(context).await?;
context.emit_event(ReceiveProgress::Completed.into());
return Ok(());
}
Err(TransferError::ConnectionError(err)) => {
warn!(context, "Connection error: {err:#}.");
continue;
}
Err(TransferError::Other(err)) => {
// Clean up any blobs we already wrote.
let readdir = fs::read_dir(context.get_blobdir()).await?;
let mut readdir = ReadDirStream::new(readdir);
while let Some(dirent) = readdir.next().await {
if let Ok(dirent) = dirent {
fs::remove_file(dirent.path()).await.ok();
}
Err(err) => {
// Clean up any blobs we already wrote.
let readdir = fs::read_dir(context.get_blobdir()).await?;
let mut readdir = ReadDirStream::new(readdir);
while let Some(dirent) = readdir.next().await {
if let Ok(dirent) = dirent {
fs::remove_file(dirent.path()).await.ok();
}
context.emit_event(ReceiveProgress::Failed.into());
return Err(err);
}
context.emit_event(ReceiveProgress::Failed.into());
Err(err)
}
}
Err(anyhow!("failed to contact provider"))
}
/// Error during a single transfer attempt.
///
/// Mostly exists to distinguish between `ConnectionError` and any other errors.
#[derive(Debug, thiserror::Error)]
enum TransferError {
#[error("connection error")]
ConnectionError(#[source] anyhow::Error),
#[error("other")]
Other(#[source] anyhow::Error),
}
async fn transfer_from_provider(
context: &Context,
ticket: &Ticket,
opts: Options,
) -> Result<(), TransferError> {
async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> {
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
spawn_progress_proxy(context.clone(), progress.subscribe());
let mut connected = false;
let on_connected = || {
context.emit_event(ReceiveProgress::Connected.into());
connected = true;
async { Ok(()) }
};
let on_collection = |collection: &Collection| {
context.emit_event(ReceiveProgress::CollectionReceived.into());
progress.set_total(collection.total_blobs_size());
async { Ok(()) }
};
let jobs = Mutex::new(JoinSet::default());
let on_blob =
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
let res = iroh::get::run(
ticket.hash,
ticket.token,
opts,
// Perform the transfer.
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
let stats = iroh::get::run_ticket(
ticket,
keylog,
MAX_CONCURRENT_DIALS,
on_connected,
|collection| {
context.emit_event(ReceiveProgress::CollectionReceived.into());
progress.set_total(collection.total_blobs_size());
async { Ok(()) }
},
on_collection,
on_blob,
)
.await;
.await?;
let mut jobs = jobs.lock().await;
while let Some(job) = jobs.join_next().await {
job.context("job failed").map_err(TransferError::Other)?;
job.context("job failed")?;
}
drop(progress);
match res {
Ok(stats) => {
info!(
context,
"Backup transfer finished, transfer rate is {} Mbps.",
stats.mbits()
);
Ok(())
}
Err(err) => match connected {
true => Err(TransferError::Other(err)),
false => Err(TransferError::ConnectionError(err)),
},
}
info!(
context,
"Backup transfer finished, transfer rate was {} Mbps.",
stats.mbits()
);
Ok(())
}
/// Get callback when a blob is received from the provider.
@@ -552,7 +516,7 @@ async fn on_blob(
if name.starts_with("db/") {
let context = context.clone();
let token = ticket.token.to_string();
let token = ticket.token().to_string();
jobs.lock().await.spawn(async move {
if let Err(err) = context.sql.import(&path, token).await {
error!(context, "cannot import database: {:#?}", err);
@@ -714,4 +678,16 @@ mod tests {
assert_eq!(out, EventType::ImexProgress(progress));
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_drop_provider() {
let mut tcm = TestContextManager::new();
let ctx = tcm.alice().await;
let provider = BackupProvider::prepare(&ctx).await.unwrap();
drop(provider);
ctx.evtracker
.get_matching(|ev| matches!(ev, EventType::ImexProgress(0)))
.await;
}
}