From 8fb3a7514e978b123cba6505a7679b26989ba648 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 10 May 2025 16:30:08 +0000 Subject: [PATCH] fix: replace FuturesUnordered from futures with JoinSet from tokio FuturesUnordered is likely buggy and iroh previously switched to JoinSet in . We also have reports with logs of background_fetch getting stuck so apparently task cancellation after timeout does not work as intended with FuturesUnordered. --- src/accounts.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index b3a72599b..3745cfca1 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -5,12 +5,10 @@ use std::future::Future; use std::path::{Path, PathBuf}; use anyhow::{bail, ensure, Context as _, Result}; -use futures::stream::FuturesUnordered; -use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::AsyncWriteExt; -use tokio::task::JoinHandle; +use tokio::task::{JoinHandle, JoinSet}; use uuid::Uuid; #[cfg(not(target_os = "ios"))] @@ -304,12 +302,6 @@ impl Accounts { /// This is an auxiliary function and not part of public API. /// Use [Accounts::background_fetch] instead. async fn background_fetch_no_timeout(accounts: Vec, events: Events) { - async fn background_fetch_and_log_error(account: Context) { - if let Err(error) = account.background_fetch().await { - warn!(account, "{error:#}"); - } - } - events.emit(Event { id: 0, typ: EventType::Info(format!( @@ -317,11 +309,15 @@ impl Accounts { accounts.len() )), }); - let mut futures_unordered: FuturesUnordered<_> = accounts - .into_iter() - .map(background_fetch_and_log_error) - .collect(); - while futures_unordered.next().await.is_some() {} + let mut set = JoinSet::new(); + for account in accounts { + set.spawn(async move { + if let Err(error) = account.background_fetch().await { + warn!(account, "{error:#}"); + } + }); + } + set.join_all().await; } /// Auxiliary function for [Accounts::background_fetch].