fix: do not lock the account manager for the whole duration of background_fetch

This commit is contained in:
link2xt
2024-10-25 21:23:43 +00:00
parent 51a1762228
commit f396ff4297
3 changed files with 56 additions and 28 deletions

View File

@@ -4882,12 +4882,12 @@ pub unsafe extern "C" fn dc_accounts_background_fetch(
} }
let accounts = &*accounts; let accounts = &*accounts;
block_on(async move { let background_fetch_future = {
let accounts = accounts.read().await; let lock = block_on(accounts.read());
accounts lock.background_fetch(Duration::from_secs(timeout_in_seconds))
.background_fetch(Duration::from_secs(timeout_in_seconds)) };
.await; // At this point account manager is not locked anymore.
}); block_on(background_fetch_future);
1 1
} }

View File

@@ -254,11 +254,12 @@ impl CommandApi {
/// Process all events until you get this one and you can safely return to the background /// Process all events until you get this one and you can safely return to the background
/// without forgetting to create notifications caused by timing race conditions. /// without forgetting to create notifications caused by timing race conditions.
async fn accounts_background_fetch(&self, timeout_in_seconds: f64) -> Result<()> { async fn accounts_background_fetch(&self, timeout_in_seconds: f64) -> Result<()> {
self.accounts let future = {
.write() let lock = self.accounts.read().await;
.await lock.background_fetch(std::time::Duration::from_secs_f64(timeout_in_seconds))
.background_fetch(std::time::Duration::from_secs_f64(timeout_in_seconds)) };
.await; // At this point account manager is not locked anymore.
future.await;
Ok(()) Ok(())
} }

View File

@@ -5,7 +5,8 @@ use std::future::Future;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use anyhow::{ensure, Context as _, Result}; use anyhow::{ensure, Context as _, Result};
use futures::future::join_all; use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
@@ -301,20 +302,48 @@ impl Accounts {
/// ///
/// This is an auxiliary function and not part of public API. /// This is an auxiliary function and not part of public API.
/// Use [Accounts::background_fetch] instead. /// Use [Accounts::background_fetch] instead.
async fn background_fetch_without_timeout(&self) { async fn background_fetch_no_timeout(accounts: Vec<Context>, events: Events) {
async fn background_fetch_and_log_error(account: Context) { async fn background_fetch_and_log_error(account: Context) {
if let Err(error) = account.background_fetch().await { if let Err(error) = account.background_fetch().await {
warn!(account, "{error:#}"); warn!(account, "{error:#}");
} }
} }
join_all( events.emit(Event {
self.accounts id: 0,
.values() typ: EventType::Info(format!(
.cloned() "Starting background fetch for {} accounts.",
.map(background_fetch_and_log_error), accounts.len()
)),
});
let mut futures_unordered: FuturesUnordered<_> = accounts
.into_iter()
.map(background_fetch_and_log_error)
.collect();
while futures_unordered.next().await.is_some() {}
}
/// Auxiliary function for [Accounts::background_fetch].
async fn background_fetch_with_timeout(
accounts: Vec<Context>,
events: Events,
timeout: std::time::Duration,
) {
if let Err(_err) = tokio::time::timeout(
timeout,
Self::background_fetch_no_timeout(accounts, events.clone()),
) )
.await; .await
{
events.emit(Event {
id: 0,
typ: EventType::Warning("Background fetch timed out.".to_string()),
});
}
events.emit(Event {
id: 0,
typ: EventType::AccountsBackgroundFetchDone,
});
} }
/// Performs a background fetch for all accounts in parallel with a timeout. /// Performs a background fetch for all accounts in parallel with a timeout.
@@ -322,15 +351,13 @@ impl Accounts {
/// The `AccountsBackgroundFetchDone` event is emitted at the end, /// The `AccountsBackgroundFetchDone` event is emitted at the end,
/// process all events until you get this one and you can safely return to the background /// process all events until you get this one and you can safely return to the background
/// without forgetting to create notifications caused by timing race conditions. /// without forgetting to create notifications caused by timing race conditions.
pub async fn background_fetch(&self, timeout: std::time::Duration) { ///
if let Err(_err) = /// Returns a future that resolves when background fetch is done,
tokio::time::timeout(timeout, self.background_fetch_without_timeout()).await /// but does not capture `&self`.
{ pub fn background_fetch(&self, timeout: std::time::Duration) -> impl Future<Output = ()> {
self.emit_event(EventType::Warning( let accounts: Vec<Context> = self.accounts.values().cloned().collect();
"Background fetch timed out.".to_string(), let events = self.events.clone();
)); Self::background_fetch_with_timeout(accounts, events, timeout)
}
self.emit_event(EventType::AccountsBackgroundFetchDone);
} }
/// Emits a single event. /// Emits a single event.