fix: replace FuturesUnordered from futures with JoinSet from tokio

FuturesUnordered is likely buggy and iroh previously switched
to JoinSet in <https://github.com/n0-computer/iroh/pull/1647>.
We also have reports with logs of background_fetch getting
stuck so apparently task cancellation after timeout does not work
as intended with FuturesUnordered.
This commit is contained in:
link2xt
2025-05-10 16:30:08 +00:00
committed by l
parent 846c8e7f1b
commit 8fb3a7514e

View File

@@ -5,12 +5,10 @@ use std::future::Future;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use anyhow::{bail, ensure, Context as _, Result}; use anyhow::{bail, ensure, Context as _, Result};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle; use tokio::task::{JoinHandle, JoinSet};
use uuid::Uuid; use uuid::Uuid;
#[cfg(not(target_os = "ios"))] #[cfg(not(target_os = "ios"))]
@@ -304,12 +302,6 @@ impl Accounts {
/// This is an auxiliary function and not part of public API. /// This is an auxiliary function and not part of public API.
/// Use [Accounts::background_fetch] instead. /// Use [Accounts::background_fetch] instead.
async fn background_fetch_no_timeout(accounts: Vec<Context>, events: Events) { async fn background_fetch_no_timeout(accounts: Vec<Context>, events: Events) {
async fn background_fetch_and_log_error(account: Context) {
if let Err(error) = account.background_fetch().await {
warn!(account, "{error:#}");
}
}
events.emit(Event { events.emit(Event {
id: 0, id: 0,
typ: EventType::Info(format!( typ: EventType::Info(format!(
@@ -317,11 +309,15 @@ impl Accounts {
accounts.len() accounts.len()
)), )),
}); });
let mut futures_unordered: FuturesUnordered<_> = accounts let mut set = JoinSet::new();
.into_iter() for account in accounts {
.map(background_fetch_and_log_error) set.spawn(async move {
.collect(); if let Err(error) = account.background_fetch().await {
while futures_unordered.next().await.is_some() {} warn!(account, "{error:#}");
}
});
}
set.join_all().await;
} }
/// Auxiliary function for [Accounts::background_fetch]. /// Auxiliary function for [Accounts::background_fetch].