diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 857dda136..47951a2e1 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4882,12 +4882,12 @@ pub unsafe extern "C" fn dc_accounts_background_fetch( } let accounts = &*accounts; - block_on(async move { - let accounts = accounts.read().await; - accounts - .background_fetch(Duration::from_secs(timeout_in_seconds)) - .await; - }); + let background_fetch_future = { + let lock = block_on(accounts.read()); + lock.background_fetch(Duration::from_secs(timeout_in_seconds)) + }; + // At this point account manager is not locked anymore. + block_on(background_fetch_future); 1 } diff --git a/deltachat-jsonrpc/src/api.rs b/deltachat-jsonrpc/src/api.rs index 9499b9206..3afe60fe7 100644 --- a/deltachat-jsonrpc/src/api.rs +++ b/deltachat-jsonrpc/src/api.rs @@ -254,11 +254,12 @@ impl CommandApi { /// Process all events until you get this one and you can safely return to the background /// without forgetting to create notifications caused by timing race conditions. async fn accounts_background_fetch(&self, timeout_in_seconds: f64) -> Result<()> { - self.accounts - .write() - .await - .background_fetch(std::time::Duration::from_secs_f64(timeout_in_seconds)) - .await; + let future = { + let lock = self.accounts.read().await; + lock.background_fetch(std::time::Duration::from_secs_f64(timeout_in_seconds)) + }; + // At this point account manager is not locked anymore. + future.await; Ok(()) } diff --git a/src/accounts.rs b/src/accounts.rs index e1cf7448f..425139d71 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -5,7 +5,8 @@ use std::future::Future; use std::path::{Path, PathBuf}; use anyhow::{ensure, Context as _, Result}; -use futures::future::join_all; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::AsyncWriteExt; @@ -301,20 +302,48 @@ impl Accounts { /// /// This is an auxiliary function and not part of public API. /// Use [Accounts::background_fetch] instead. - async fn background_fetch_without_timeout(&self) { + async fn background_fetch_no_timeout(accounts: Vec, events: Events) { async fn background_fetch_and_log_error(account: Context) { if let Err(error) = account.background_fetch().await { warn!(account, "{error:#}"); } } - join_all( - self.accounts - .values() - .cloned() - .map(background_fetch_and_log_error), + events.emit(Event { + id: 0, + typ: EventType::Info(format!( + "Starting background fetch for {} accounts.", + accounts.len() + )), + }); + let mut futures_unordered: FuturesUnordered<_> = accounts + .into_iter() + .map(background_fetch_and_log_error) + .collect(); + while futures_unordered.next().await.is_some() {} + } + + /// Auxiliary function for [Accounts::background_fetch]. + async fn background_fetch_with_timeout( + accounts: Vec, + events: Events, + timeout: std::time::Duration, + ) { + if let Err(_err) = tokio::time::timeout( + timeout, + Self::background_fetch_no_timeout(accounts, events.clone()), ) - .await; + .await + { + events.emit(Event { + id: 0, + typ: EventType::Warning("Background fetch timed out.".to_string()), + }); + } + events.emit(Event { + id: 0, + typ: EventType::AccountsBackgroundFetchDone, + }); } /// Performs a background fetch for all accounts in parallel with a timeout. @@ -322,15 +351,13 @@ impl Accounts { /// The `AccountsBackgroundFetchDone` event is emitted at the end, /// process all events until you get this one and you can safely return to the background /// without forgetting to create notifications caused by timing race conditions. - pub async fn background_fetch(&self, timeout: std::time::Duration) { - if let Err(_err) = - tokio::time::timeout(timeout, self.background_fetch_without_timeout()).await - { - self.emit_event(EventType::Warning( - "Background fetch timed out.".to_string(), - )); - } - self.emit_event(EventType::AccountsBackgroundFetchDone); + /// + /// Returns a future that resolves when background fetch is done, + /// but does not capture `&self`. + pub fn background_fetch(&self, timeout: std::time::Duration) -> impl Future { + let accounts: Vec = self.accounts.values().cloned().collect(); + let events = self.events.clone(); + Self::background_fetch_with_timeout(accounts, events, timeout) } /// Emits a single event.