refactor: Make ConnectivityStore use a non-async lock (#7129)

Follow-up to https://github.com/chatmail/core/pull/7125: We now have a
mix of non-async (parking_lot) and async (tokio) Mutexes used for the
connectivity. We can just use non-async Mutexes, because we don't
attempt to hold them over an await point. I also tested that we get a
compiler error if we do try to hold one over an await point (rather than
just deadlocking/blocking the executor on runtime).

Not 100% sure about using the parking_lot rather than std Mutex, because
since https://github.com/rust-lang/rust/issues/93740, parking_lot
doesn't have a lot of advantages anymore. But as long as iroh depends on
it, we might as well use it ourselves.
This commit is contained in:
Hocuri
2025-08-23 21:08:17 +02:00
committed by GitHub
parent c34ccafb2e
commit 2cd54b72b0
6 changed files with 47 additions and 49 deletions

View File

@@ -375,7 +375,7 @@ pub unsafe extern "C" fn dc_get_connectivity(context: *const dc_context_t) -> li
return 0; return 0;
} }
let ctx = &*context; let ctx = &*context;
block_on(ctx.get_connectivity()) as u32 as libc::c_int ctx.get_connectivity() as u32 as libc::c_int
} }
#[no_mangle] #[no_mangle]

View File

@@ -1908,7 +1908,7 @@ impl CommandApi {
/// If the connectivity changes, a #DC_EVENT_CONNECTIVITY_CHANGED will be emitted. /// If the connectivity changes, a #DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
async fn get_connectivity(&self, account_id: u32) -> Result<u32> { async fn get_connectivity(&self, account_id: u32) -> Result<u32> {
let ctx = self.get_context(account_id).await?; let ctx = self.get_context(account_id).await?;
Ok(ctx.get_connectivity().await as u32) Ok(ctx.get_connectivity() as u32)
} }
/// Get an overview of the current connectivity, and possibly more statistics. /// Get an overview of the current connectivity, and possibly more statistics.

View File

@@ -325,7 +325,7 @@ impl Imap {
} }
info!(context, "Connecting to IMAP server."); info!(context, "Connecting to IMAP server.");
self.connectivity.set_connecting(context).await; self.connectivity.set_connecting(context);
self.conn_last_try = tools::Time::now(); self.conn_last_try = tools::Time::now();
const BACKOFF_MIN_MS: u64 = 2000; const BACKOFF_MIN_MS: u64 = 2000;
@@ -408,7 +408,7 @@ impl Imap {
"IMAP-LOGIN as {}", "IMAP-LOGIN as {}",
lp.user lp.user
))); )));
self.connectivity.set_preparing(context).await; self.connectivity.set_preparing(context);
info!(context, "Successfully logged into IMAP server."); info!(context, "Successfully logged into IMAP server.");
return Ok(session); return Ok(session);
} }
@@ -466,7 +466,7 @@ impl Imap {
let mut session = match self.connect(context, configuring).await { let mut session = match self.connect(context, configuring).await {
Ok(session) => session, Ok(session) => session,
Err(err) => { Err(err) => {
self.connectivity.set_err(context, &err).await; self.connectivity.set_err(context, &err);
return Err(err); return Err(err);
} }
}; };
@@ -692,7 +692,7 @@ impl Imap {
} }
if !uids_fetch.is_empty() { if !uids_fetch.is_empty() {
self.connectivity.set_working(context).await; self.connectivity.set_working(context);
} }
let (sender, receiver) = async_channel::unbounded(); let (sender, receiver) = async_channel::unbounded();

View File

@@ -227,7 +227,7 @@ impl SchedulerState {
_ => return, _ => return,
}; };
drop(inner); drop(inner);
connectivity::idle_interrupted(inbox, oboxes).await; connectivity::idle_interrupted(inbox, oboxes);
} }
/// Indicate that the network likely is lost. /// Indicate that the network likely is lost.
@@ -244,7 +244,7 @@ impl SchedulerState {
_ => return, _ => return,
}; };
drop(inner); drop(inner);
connectivity::maybe_network_lost(context, stores).await; connectivity::maybe_network_lost(context, stores);
} }
pub(crate) async fn interrupt_inbox(&self) { pub(crate) async fn interrupt_inbox(&self) {
@@ -569,7 +569,7 @@ async fn fetch_idle(
// The folder is not configured. // The folder is not configured.
// For example, this happens if the server does not have Sent folder // For example, this happens if the server does not have Sent folder
// but watching Sent folder is enabled. // but watching Sent folder is enabled.
connection.connectivity.set_not_configured(ctx).await; connection.connectivity.set_not_configured(ctx);
connection.idle_interrupt_receiver.recv().await.ok(); connection.idle_interrupt_receiver.recv().await.ok();
bail!("Cannot fetch folder {folder_meaning} because it is not configured"); bail!("Cannot fetch folder {folder_meaning} because it is not configured");
}; };
@@ -659,7 +659,7 @@ async fn fetch_idle(
.log_err(ctx) .log_err(ctx)
.ok(); .ok();
connection.connectivity.set_idle(ctx).await; connection.connectivity.set_idle(ctx);
ctx.emit_event(EventType::ImapInboxIdle); ctx.emit_event(EventType::ImapInboxIdle);
@@ -810,8 +810,8 @@ async fn smtp_loop(
// Fake Idle // Fake Idle
info!(ctx, "SMTP fake idle started."); info!(ctx, "SMTP fake idle started.");
match &connection.last_send_error { match &connection.last_send_error {
None => connection.connectivity.set_idle(&ctx).await, None => connection.connectivity.set_idle(&ctx),
Some(err) => connection.connectivity.set_err(&ctx, err).await, Some(err) => connection.connectivity.set_err(&ctx, err),
} }
// If send_smtp_messages() failed, we set a timeout for the fake-idle so that // If send_smtp_messages() failed, we set a timeout for the fake-idle so that

View File

@@ -4,7 +4,6 @@ use std::{iter::once, ops::Deref, sync::Arc};
use anyhow::Result; use anyhow::Result;
use humansize::{BINARY, format_size}; use humansize::{BINARY, format_size};
use tokio::sync::Mutex;
use crate::events::EventType; use crate::events::EventType;
use crate::imap::{FolderMeaning, scan_folders::get_watched_folder_configs}; use crate::imap::{FolderMeaning, scan_folders::get_watched_folder_configs};
@@ -160,52 +159,51 @@ impl DetailedConnectivity {
} }
#[derive(Clone, Default)] #[derive(Clone, Default)]
pub(crate) struct ConnectivityStore(Arc<Mutex<DetailedConnectivity>>); pub(crate) struct ConnectivityStore(Arc<parking_lot::Mutex<DetailedConnectivity>>);
impl ConnectivityStore { impl ConnectivityStore {
async fn set(&self, context: &Context, v: DetailedConnectivity) { fn set(&self, context: &Context, v: DetailedConnectivity) {
{ {
*self.0.lock().await = v; *self.0.lock() = v;
} }
context.emit_event(EventType::ConnectivityChanged); context.emit_event(EventType::ConnectivityChanged);
} }
pub(crate) async fn set_err(&self, context: &Context, e: impl ToString) { pub(crate) fn set_err(&self, context: &Context, e: impl ToString) {
self.set(context, DetailedConnectivity::Error(e.to_string())) self.set(context, DetailedConnectivity::Error(e.to_string()));
.await;
} }
pub(crate) async fn set_connecting(&self, context: &Context) { pub(crate) fn set_connecting(&self, context: &Context) {
self.set(context, DetailedConnectivity::Connecting).await; self.set(context, DetailedConnectivity::Connecting);
} }
pub(crate) async fn set_working(&self, context: &Context) { pub(crate) fn set_working(&self, context: &Context) {
self.set(context, DetailedConnectivity::Working).await; self.set(context, DetailedConnectivity::Working);
} }
pub(crate) async fn set_preparing(&self, context: &Context) { pub(crate) fn set_preparing(&self, context: &Context) {
self.set(context, DetailedConnectivity::Preparing).await; self.set(context, DetailedConnectivity::Preparing);
} }
pub(crate) async fn set_not_configured(&self, context: &Context) { pub(crate) fn set_not_configured(&self, context: &Context) {
self.set(context, DetailedConnectivity::NotConfigured).await; self.set(context, DetailedConnectivity::NotConfigured);
} }
pub(crate) async fn set_idle(&self, context: &Context) { pub(crate) fn set_idle(&self, context: &Context) {
self.set(context, DetailedConnectivity::Idle).await; self.set(context, DetailedConnectivity::Idle);
} }
async fn get_detailed(&self) -> DetailedConnectivity { fn get_detailed(&self) -> DetailedConnectivity {
self.0.lock().await.deref().clone() self.0.lock().deref().clone()
} }
async fn get_basic(&self) -> Option<Connectivity> { fn get_basic(&self) -> Option<Connectivity> {
self.0.lock().await.to_basic() self.0.lock().to_basic()
} }
async fn get_all_work_done(&self) -> bool { fn get_all_work_done(&self) -> bool {
self.0.lock().await.all_work_done() self.0.lock().all_work_done()
} }
} }
/// Set all folder states to InterruptingIdle in case they were `Idle` before. /// Set all folder states to InterruptingIdle in case they were `Idle` before.
/// Called during `dc_maybe_network()` to make sure that `all_work_done()` /// Called during `dc_maybe_network()` to make sure that `all_work_done()`
/// returns false immediately after `dc_maybe_network()`. /// returns false immediately after `dc_maybe_network()`.
pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<ConnectivityStore>) { pub(crate) fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<ConnectivityStore>) {
let mut connectivity_lock = inbox.0.lock().await; let mut connectivity_lock = inbox.0.lock();
// For the inbox, we also have to set the connectivity to InterruptingIdle if it was // For the inbox, we also have to set the connectivity to InterruptingIdle if it was
// NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity()
// returns Connected. But after dc_maybe_network(), dc_get_connectivity() must not // returns Connected. But after dc_maybe_network(), dc_get_connectivity() must not
@@ -219,7 +217,7 @@ pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<Conne
drop(connectivity_lock); drop(connectivity_lock);
for state in oboxes { for state in oboxes {
let mut connectivity_lock = state.0.lock().await; let mut connectivity_lock = state.0.lock();
if *connectivity_lock == DetailedConnectivity::Idle { if *connectivity_lock == DetailedConnectivity::Idle {
*connectivity_lock = DetailedConnectivity::InterruptingIdle; *connectivity_lock = DetailedConnectivity::InterruptingIdle;
} }
@@ -231,9 +229,9 @@ pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec<Conne
/// Set the connectivity to "Not connected" after a call to dc_maybe_network_lost(). /// Set the connectivity to "Not connected" after a call to dc_maybe_network_lost().
/// If we did not do this, the connectivity would stay "Connected" for quite a long time /// If we did not do this, the connectivity would stay "Connected" for quite a long time
/// after `maybe_network_lost()` was called. /// after `maybe_network_lost()` was called.
pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec<ConnectivityStore>) { pub(crate) fn maybe_network_lost(context: &Context, stores: Vec<ConnectivityStore>) {
for store in &stores { for store in &stores {
let mut connectivity_lock = store.0.lock().await; let mut connectivity_lock = store.0.lock();
if !matches!( if !matches!(
*connectivity_lock, *connectivity_lock,
DetailedConnectivity::Uninitialized DetailedConnectivity::Uninitialized
@@ -248,7 +246,7 @@ pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec<Connectivi
impl fmt::Debug for ConnectivityStore { impl fmt::Debug for ConnectivityStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Ok(guard) = self.0.try_lock() { if let Some(guard) = self.0.try_lock() {
write!(f, "ConnectivityStore {:?}", &*guard) write!(f, "ConnectivityStore {:?}", &*guard)
} else { } else {
write!(f, "ConnectivityStore [LOCKED]") write!(f, "ConnectivityStore [LOCKED]")
@@ -271,11 +269,11 @@ impl Context {
/// e.g. in the title of the main screen. /// e.g. in the title of the main screen.
/// ///
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
pub async fn get_connectivity(&self) -> Connectivity { pub fn get_connectivity(&self) -> Connectivity {
let stores = self.connectivities.lock().clone(); let stores = self.connectivities.lock().clone();
let mut connectivities = Vec::new(); let mut connectivities = Vec::new();
for s in stores { for s in stores {
if let Some(connectivity) = s.get_basic().await { if let Some(connectivity) = s.get_basic() {
connectivities.push(connectivity); connectivities.push(connectivity);
} }
} }
@@ -393,7 +391,7 @@ impl Context {
let f = self.get_config(config).await.log_err(self).ok().flatten(); let f = self.get_config(config).await.log_err(self).ok().flatten();
if let Some(foldername) = f { if let Some(foldername) = f {
let detailed = &state.get_detailed().await; let detailed = &state.get_detailed();
ret += "<li>"; ret += "<li>";
ret += &*detailed.to_icon(); ret += &*detailed.to_icon();
ret += " <b>"; ret += " <b>";
@@ -407,7 +405,7 @@ impl Context {
} }
if !folder_added && folder == &FolderMeaning::Inbox { if !folder_added && folder == &FolderMeaning::Inbox {
let detailed = &state.get_detailed().await; let detailed = &state.get_detailed();
if let DetailedConnectivity::Error(_) = detailed { if let DetailedConnectivity::Error(_) = detailed {
// On the inbox thread, we also do some other things like scan_folders and run jobs // On the inbox thread, we also do some other things like scan_folders and run jobs
// so, maybe, the inbox is not watched, but something else went wrong // so, maybe, the inbox is not watched, but something else went wrong
@@ -429,7 +427,7 @@ impl Context {
let outgoing_messages = stock_str::outgoing_messages(self).await; let outgoing_messages = stock_str::outgoing_messages(self).await;
ret += &format!("<h3>{outgoing_messages}</h3><ul><li>"); ret += &format!("<h3>{outgoing_messages}</h3><ul><li>");
let detailed = smtp.get_detailed().await; let detailed = smtp.get_detailed();
ret += &*detailed.to_icon(); ret += &*detailed.to_icon();
ret += " "; ret += " ";
ret += &*escaper::encode_minimal(&detailed.to_string_smtp(self).await); ret += &*escaper::encode_minimal(&detailed.to_string_smtp(self).await);
@@ -553,7 +551,7 @@ impl Context {
drop(lock); drop(lock);
for s in &stores { for s in &stores {
if !s.get_all_work_done().await { if !s.get_all_work_done() {
return false; return false;
} }
} }

View File

@@ -87,7 +87,7 @@ impl Smtp {
return Ok(()); return Ok(());
} }
self.connectivity.set_connecting(context).await; self.connectivity.set_connecting(context);
let lp = ConfiguredLoginParam::load(context) let lp = ConfiguredLoginParam::load(context)
.await? .await?
.context("Not configured")?; .context("Not configured")?;
@@ -187,7 +187,7 @@ pub(crate) async fn smtp_send(
info!(context, "SMTP-sending out mime message:\n{message}"); info!(context, "SMTP-sending out mime message:\n{message}");
} }
smtp.connectivity.set_working(context).await; smtp.connectivity.set_working(context);
if let Err(err) = smtp if let Err(err) = smtp
.connect_configured(context) .connect_configured(context)