From 81ada2c696d179804189706ad28bfdd04159f16d Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Mon, 24 Jul 2023 18:18:41 +0200 Subject: [PATCH] try to fix lifetime error --- Cargo.lock | 4 ++-- src/imex/transfer.rs | 27 ++++++++++++++++++++------- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d22efa596..c5ebb5d4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -871,9 +871,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.5" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a385f5d34e5eff161df2369056a3fd194fcabd8a64ce0eed02de09fcb3203434" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" [[package]] name = "cpufeatures" diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index f391d2ada..ed1b106e0 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -474,10 +474,10 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<() let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress()); spawn_progress_proxy(context.clone(), progress.subscribe()); - let jobs = Mutex::new(JoinSet::default()); + let jobs = Arc::new(Mutex::new(JoinSet::default())); // Perform the transfer. - let stats = run_get_request(context, &progress, &jobs, ticket.clone()).await?; + let stats = run_get_request(context, &progress, jobs.clone(), ticket.clone()).await?; let mut jobs = jobs.lock().await; while let Some(job) = jobs.join_next().await { @@ -496,7 +496,7 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<() async fn run_get_request( context: &Context, progress: &ProgressEmitter, - jobs: &Mutex>, + jobs: Arc>>, ticket: Ticket, ) -> anyhow::Result { // DERP usage for NAT traversal and relay are currently disabled. @@ -511,6 +511,8 @@ async fn run_get_request( let connected = initial.next().await?; context.emit_event(ReceiveProgress::Connected.into()); + let rt = runtime::Handle::from_currrent(1)?; + // we assume that the request includes the entire collection let (mut next, _root, collection) = { let fsm::ConnectedNext::StartRoot(sc) = connected.next().await? else { @@ -541,7 +543,18 @@ async fn run_get_request( }; let start = start.next(blob.hash); - let done = on_blob(context, jobs, &ticket, start, &blob.name).await?; + + // `iroh_io` io needs to be done on a local spawn + let ticket = ticket.clone(); + let context = context.clone(); + let name = blob.name.clone(); + let jobs = jobs.clone(); + let done = rt + .local_pool() + .spawn_pinned(move || { + Box::pin(async move { on_blob(context, jobs, ticket, start, &name).await }) + }) + .await??; next = done.next(); }; @@ -555,9 +568,9 @@ async fn run_get_request( /// This writes the blobs to the blobdir. If the blob is the database it will import it to /// the database of the current [`Context`]. async fn on_blob( - context: &Context, - jobs: &Mutex>, - ticket: &Ticket, + context: Context, + jobs: Arc>>, + ticket: Ticket, state: fsm::AtBlobHeader, name: &str, ) -> Result {