diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index e789048f4..664e8ca89 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -39,9 +39,9 @@ use iroh::provider::{DataSource, Event, Provider, Ticket}; use iroh::Hash; use tokio::fs::{self, File}; use tokio::io::{self, AsyncWriteExt, BufWriter}; -use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; -use tokio::task::JoinHandle; +use tokio::sync::{broadcast, Mutex}; +use tokio::task::{JoinHandle, JoinSet}; use tokio_stream::wrappers::ReadDirStream; use crate::blob::BlobDirContents; @@ -50,7 +50,7 @@ use crate::context::Context; use crate::qr::Qr; use crate::{e2ee, EventType}; -use super::{export_database, DeleteOnDrop, DBFILE_BACKUP_NAME}; +use super::{export_database, DBFILE_BACKUP_NAME}; /// Provide or send a backup of this device. /// @@ -423,7 +423,9 @@ async fn transfer_from_provider( connected = true; async { Ok(()) } }; - let on_blob = |hash, reader, name| on_blob(context, &progress, ticket, hash, reader, name); + let jobs = Mutex::new(JoinSet::default()); + let on_blob = + |hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name); let res = iroh::get::run( ticket.hash, ticket.token, @@ -437,6 +439,12 @@ async fn transfer_from_provider( on_blob, ) .await; + + let mut jobs = jobs.lock().await; + while let Some(job) = jobs.join_next().await { + job.context("job failed").map_err(TransferError::Other)?; + } + drop(progress); res.map(|_| ()).map_err(|err| match connected { true => TransferError::Other(err), @@ -451,6 +459,7 @@ async fn transfer_from_provider( async fn on_blob( context: &Context, progress: &ProgressEmitter, + jobs: &Mutex>, ticket: &Ticket, _hash: Hash, mut reader: DataStream, @@ -473,11 +482,6 @@ async fn on_blob( let blobname = name.rsplit('/').next().context("malformatted blob name")?; context.get_blobdir().join(blobname) }; - let _guard = if name.starts_with("db/") { - Some(DeleteOnDrop(path.clone())) - } else { - None - }; let mut wrapped_reader = progress.wrap_async_read(&mut reader); let file = File::create(&path).await?; @@ -486,14 +490,21 @@ async fn on_blob( file.flush().await?; if name.starts_with("db/") { - context - .sql - .import(&path, ticket.token.to_string()) - .await - .context("cannot import database")?; - fs::remove_file(&path) - .await - .with_context(|| format!("database import file: {}", path.display()))?; + let context = context.clone(); + let token = ticket.token.to_string(); + jobs.lock().await.spawn(async move { + if let Err(err) = context.sql.import(&path, token).await { + error!(context, "cannot import database: {:#?}", err); + } + if let Err(err) = fs::remove_file(&path).await { + error!( + context, + "failed to delete database import file '{}': {:#?}", + path.display(), + err, + ); + } + }); } Ok(reader) }