mirror of
https://github.com/chatmail/core.git
synced 2026-05-19 23:06:32 +03:00
fix: get_connectivity(): Get rid of locking SchedulerState::inner (#7124)
`get_connectivity()` is expected to return immediately, not when the scheduler finishes updating its state in `start_io()/stop_io()/pause_io()`, otherwise it causes app non-responsiveness. Instead of read-locking `SchedulerState::inner`, store the `ConnectivityStore` collection in `Context` and fetch it from there in `get_connectivity()`. Update it every time we release a write lock on `SchedulerState::inner`.
This commit is contained in:
@@ -734,7 +734,7 @@ impl Context {
|
|||||||
Self::check_config(key, value)?;
|
Self::check_config(key, value)?;
|
||||||
|
|
||||||
let _pause = match key.needs_io_restart() {
|
let _pause = match key.needs_io_restart() {
|
||||||
true => self.scheduler.pause(self.clone()).await?,
|
true => self.scheduler.pause(self).await?,
|
||||||
_ => Default::default(),
|
_ => Default::default(),
|
||||||
};
|
};
|
||||||
self.set_config_internal(key, value).await?;
|
self.set_config_internal(key, value).await?;
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ use crate::param::{Param, Params};
|
|||||||
use crate::peer_channels::Iroh;
|
use crate::peer_channels::Iroh;
|
||||||
use crate::push::PushSubscriber;
|
use crate::push::PushSubscriber;
|
||||||
use crate::quota::QuotaInfo;
|
use crate::quota::QuotaInfo;
|
||||||
use crate::scheduler::{SchedulerState, convert_folder_meaning};
|
use crate::scheduler::{ConnectivityStore, SchedulerState, convert_folder_meaning};
|
||||||
use crate::sql::Sql;
|
use crate::sql::Sql;
|
||||||
use crate::stock_str::StockStrings;
|
use crate::stock_str::StockStrings;
|
||||||
use crate::timesmearing::SmearedTimestamp;
|
use crate::timesmearing::SmearedTimestamp;
|
||||||
@@ -304,6 +304,10 @@ pub struct InnerContext {
|
|||||||
/// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
|
/// tokio::sync::OnceCell would be possible to use, but overkill for our usecase;
|
||||||
/// the standard library's OnceLock is enough, and it's a lot smaller in memory.
|
/// the standard library's OnceLock is enough, and it's a lot smaller in memory.
|
||||||
pub(crate) self_fingerprint: OnceLock<String>,
|
pub(crate) self_fingerprint: OnceLock<String>,
|
||||||
|
|
||||||
|
/// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity,
|
||||||
|
/// see [`Context::get_connectivity()`].
|
||||||
|
pub(crate) connectivities: parking_lot::Mutex<Vec<ConnectivityStore>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The state of ongoing process.
|
/// The state of ongoing process.
|
||||||
@@ -473,6 +477,7 @@ impl Context {
|
|||||||
push_subscribed: AtomicBool::new(false),
|
push_subscribed: AtomicBool::new(false),
|
||||||
iroh: Arc::new(RwLock::new(None)),
|
iroh: Arc::new(RwLock::new(None)),
|
||||||
self_fingerprint: OnceLock::new(),
|
self_fingerprint: OnceLock::new(),
|
||||||
|
connectivities: parking_lot::Mutex::new(Vec::new()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let ctx = Context {
|
let ctx = Context {
|
||||||
@@ -502,7 +507,7 @@ impl Context {
|
|||||||
// Now, some configs may have changed, so, we need to invalidate the cache.
|
// Now, some configs may have changed, so, we need to invalidate the cache.
|
||||||
self.sql.config_cache.write().await.clear();
|
self.sql.config_cache.write().await.clear();
|
||||||
|
|
||||||
self.scheduler.start(self.clone()).await;
|
self.scheduler.start(self).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stops the IO scheduler.
|
/// Stops the IO scheduler.
|
||||||
@@ -579,7 +584,7 @@ impl Context {
|
|||||||
} else {
|
} else {
|
||||||
// Pause the scheduler to ensure another connection does not start
|
// Pause the scheduler to ensure another connection does not start
|
||||||
// while we are fetching on a dedicated connection.
|
// while we are fetching on a dedicated connection.
|
||||||
let _pause_guard = self.scheduler.pause(self.clone()).await?;
|
let _pause_guard = self.scheduler.pause(self).await?;
|
||||||
|
|
||||||
// Start a new dedicated connection.
|
// Start a new dedicated connection.
|
||||||
let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
|
let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?;
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ pub async fn imex(
|
|||||||
let cancel = context.alloc_ongoing().await?;
|
let cancel = context.alloc_ongoing().await?;
|
||||||
|
|
||||||
let res = {
|
let res = {
|
||||||
let _guard = context.scheduler.pause(context.clone()).await?;
|
let _guard = context.scheduler.pause(context).await?;
|
||||||
imex_inner(context, what, path, passphrase)
|
imex_inner(context, what, path, passphrase)
|
||||||
.race(async {
|
.race(async {
|
||||||
cancel.recv().await.ok();
|
cancel.recv().await.ok();
|
||||||
|
|||||||
@@ -105,7 +105,7 @@ impl BackupProvider {
|
|||||||
|
|
||||||
// Acquire global "ongoing" mutex.
|
// Acquire global "ongoing" mutex.
|
||||||
let cancel_token = context.alloc_ongoing().await?;
|
let cancel_token = context.alloc_ongoing().await?;
|
||||||
let paused_guard = context.scheduler.pause(context.clone()).await?;
|
let paused_guard = context.scheduler.pause(context).await?;
|
||||||
let context_dir = context
|
let context_dir = context
|
||||||
.get_blobdir()
|
.get_blobdir()
|
||||||
.parent()
|
.parent()
|
||||||
|
|||||||
@@ -8,12 +8,12 @@ use async_channel::{self as channel, Receiver, Sender};
|
|||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use futures_lite::FutureExt;
|
use futures_lite::FutureExt;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use tokio::sync::{RwLock, RwLockWriteGuard, oneshot};
|
use tokio::sync::{RwLock, oneshot};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tokio_util::task::TaskTracker;
|
use tokio_util::task::TaskTracker;
|
||||||
|
|
||||||
use self::connectivity::ConnectivityStore;
|
pub(crate) use self::connectivity::ConnectivityStore;
|
||||||
use crate::config::{self, Config};
|
use crate::config::{self, Config};
|
||||||
use crate::constants;
|
use crate::constants;
|
||||||
use crate::contact::{ContactId, RecentlySeenLoop};
|
use crate::contact::{ContactId, RecentlySeenLoop};
|
||||||
@@ -53,32 +53,32 @@ impl SchedulerState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Starts the scheduler if it is not yet started.
|
/// Starts the scheduler if it is not yet started.
|
||||||
pub(crate) async fn start(&self, context: Context) {
|
pub(crate) async fn start(&self, context: &Context) {
|
||||||
let mut inner = self.inner.write().await;
|
let mut inner = self.inner.write().await;
|
||||||
match *inner {
|
match *inner {
|
||||||
InnerSchedulerState::Started(_) => (),
|
InnerSchedulerState::Started(_) => (),
|
||||||
InnerSchedulerState::Stopped => Self::do_start(inner, context).await,
|
InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await,
|
||||||
InnerSchedulerState::Paused {
|
InnerSchedulerState::Paused {
|
||||||
ref mut started, ..
|
ref mut started, ..
|
||||||
} => *started = true,
|
} => *started = true,
|
||||||
}
|
}
|
||||||
|
context.update_connectivities(&inner);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts the scheduler if it is not yet started.
|
/// Starts the scheduler if it is not yet started.
|
||||||
async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) {
|
async fn do_start(inner: &mut InnerSchedulerState, context: &Context) {
|
||||||
info!(context, "starting IO");
|
info!(context, "starting IO");
|
||||||
|
|
||||||
// Notify message processing loop
|
// Notify message processing loop
|
||||||
// to allow processing old messages after restart.
|
// to allow processing old messages after restart.
|
||||||
context.new_msgs_notify.notify_one();
|
context.new_msgs_notify.notify_one();
|
||||||
|
|
||||||
let ctx = context.clone();
|
match Scheduler::start(context).await {
|
||||||
match Scheduler::start(&context).await {
|
|
||||||
Ok(scheduler) => {
|
Ok(scheduler) => {
|
||||||
*inner = InnerSchedulerState::Started(scheduler);
|
*inner = InnerSchedulerState::Started(scheduler);
|
||||||
context.emit_event(EventType::ConnectivityChanged);
|
context.emit_event(EventType::ConnectivityChanged);
|
||||||
}
|
}
|
||||||
Err(err) => error!(&ctx, "Failed to start IO: {:#}", err),
|
Err(err) => error!(context, "Failed to start IO: {:#}", err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,18 +87,19 @@ impl SchedulerState {
|
|||||||
let mut inner = self.inner.write().await;
|
let mut inner = self.inner.write().await;
|
||||||
match *inner {
|
match *inner {
|
||||||
InnerSchedulerState::Started(_) => {
|
InnerSchedulerState::Started(_) => {
|
||||||
Self::do_stop(inner, context, InnerSchedulerState::Stopped).await
|
Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await
|
||||||
}
|
}
|
||||||
InnerSchedulerState::Stopped => (),
|
InnerSchedulerState::Stopped => (),
|
||||||
InnerSchedulerState::Paused {
|
InnerSchedulerState::Paused {
|
||||||
ref mut started, ..
|
ref mut started, ..
|
||||||
} => *started = false,
|
} => *started = false,
|
||||||
}
|
}
|
||||||
|
context.update_connectivities(&inner);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stops the scheduler if it is currently running.
|
/// Stops the scheduler if it is currently running.
|
||||||
async fn do_stop(
|
async fn do_stop(
|
||||||
mut inner: RwLockWriteGuard<'_, InnerSchedulerState>,
|
inner: &mut InnerSchedulerState,
|
||||||
context: &Context,
|
context: &Context,
|
||||||
new_state: InnerSchedulerState,
|
new_state: InnerSchedulerState,
|
||||||
) {
|
) {
|
||||||
@@ -122,7 +123,7 @@ impl SchedulerState {
|
|||||||
debug_logging.loop_handle.abort();
|
debug_logging.loop_handle.abort();
|
||||||
debug_logging.loop_handle.await.ok();
|
debug_logging.loop_handle.await.ok();
|
||||||
}
|
}
|
||||||
let prev_state = std::mem::replace(&mut *inner, new_state);
|
let prev_state = std::mem::replace(inner, new_state);
|
||||||
context.emit_event(EventType::ConnectivityChanged);
|
context.emit_event(EventType::ConnectivityChanged);
|
||||||
match prev_state {
|
match prev_state {
|
||||||
InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
|
InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await,
|
||||||
@@ -138,7 +139,7 @@ impl SchedulerState {
|
|||||||
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
||||||
/// resume will do the right thing and restore the scheduler to the state requested by
|
/// resume will do the right thing and restore the scheduler to the state requested by
|
||||||
/// the last call.
|
/// the last call.
|
||||||
pub(crate) async fn pause(&'_ self, context: Context) -> Result<IoPausedGuard> {
|
pub(crate) async fn pause(&'_ self, context: &Context) -> Result<IoPausedGuard> {
|
||||||
{
|
{
|
||||||
let mut inner = self.inner.write().await;
|
let mut inner = self.inner.write().await;
|
||||||
match *inner {
|
match *inner {
|
||||||
@@ -147,7 +148,7 @@ impl SchedulerState {
|
|||||||
started: true,
|
started: true,
|
||||||
pause_guards_count: NonZeroUsize::new(1).unwrap(),
|
pause_guards_count: NonZeroUsize::new(1).unwrap(),
|
||||||
};
|
};
|
||||||
Self::do_stop(inner, &context, new_state).await;
|
Self::do_stop(&mut inner, context, new_state).await;
|
||||||
}
|
}
|
||||||
InnerSchedulerState::Stopped => {
|
InnerSchedulerState::Stopped => {
|
||||||
*inner = InnerSchedulerState::Paused {
|
*inner = InnerSchedulerState::Paused {
|
||||||
@@ -164,9 +165,11 @@ impl SchedulerState {
|
|||||||
.ok_or_else(|| Error::msg("Too many pause guards active"))?
|
.ok_or_else(|| Error::msg("Too many pause guards active"))?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
context.update_connectivities(&inner);
|
||||||
}
|
}
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let context = context.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
rx.await.ok();
|
rx.await.ok();
|
||||||
let mut inner = context.scheduler.inner.write().await;
|
let mut inner = context.scheduler.inner.write().await;
|
||||||
@@ -183,7 +186,7 @@ impl SchedulerState {
|
|||||||
} => {
|
} => {
|
||||||
if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
|
if *pause_guards_count == NonZeroUsize::new(1).unwrap() {
|
||||||
match *started {
|
match *started {
|
||||||
true => SchedulerState::do_start(inner, context.clone()).await,
|
true => SchedulerState::do_start(&mut inner, &context).await,
|
||||||
false => *inner = InnerSchedulerState::Stopped,
|
false => *inner = InnerSchedulerState::Stopped,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -193,6 +196,7 @@ impl SchedulerState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
context.update_connectivities(&inner);
|
||||||
});
|
});
|
||||||
Ok(IoPausedGuard { sender: Some(tx) })
|
Ok(IoPausedGuard { sender: Some(tx) })
|
||||||
}
|
}
|
||||||
@@ -202,7 +206,7 @@ impl SchedulerState {
|
|||||||
info!(context, "restarting IO");
|
info!(context, "restarting IO");
|
||||||
if self.is_running().await {
|
if self.is_running().await {
|
||||||
self.stop(context).await;
|
self.stop(context).await;
|
||||||
self.start(context.clone()).await;
|
self.start(context).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -288,7 +292,7 @@ impl SchedulerState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
enum InnerSchedulerState {
|
pub(crate) enum InnerSchedulerState {
|
||||||
Started(Scheduler),
|
Started(Scheduler),
|
||||||
#[default]
|
#[default]
|
||||||
Stopped,
|
Stopped,
|
||||||
|
|||||||
@@ -272,16 +272,7 @@ impl Context {
|
|||||||
///
|
///
|
||||||
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
|
/// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted.
|
||||||
pub async fn get_connectivity(&self) -> Connectivity {
|
pub async fn get_connectivity(&self) -> Connectivity {
|
||||||
let lock = self.scheduler.inner.read().await;
|
let stores = self.connectivities.lock().clone();
|
||||||
let stores: Vec<_> = match *lock {
|
|
||||||
InnerSchedulerState::Started(ref sched) => sched
|
|
||||||
.boxes()
|
|
||||||
.map(|b| b.conn_state.state.connectivity.clone())
|
|
||||||
.collect(),
|
|
||||||
_ => return Connectivity::NotConnected,
|
|
||||||
};
|
|
||||||
drop(lock);
|
|
||||||
|
|
||||||
let mut connectivities = Vec::new();
|
let mut connectivities = Vec::new();
|
||||||
for s in stores {
|
for s in stores {
|
||||||
if let Some(connectivity) = s.get_basic().await {
|
if let Some(connectivity) = s.get_basic().await {
|
||||||
@@ -291,7 +282,18 @@ impl Context {
|
|||||||
connectivities
|
connectivities
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.min()
|
.min()
|
||||||
.unwrap_or(Connectivity::Connected)
|
.unwrap_or(Connectivity::NotConnected)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn update_connectivities(&self, sched: &InnerSchedulerState) {
|
||||||
|
let stores: Vec<_> = match sched {
|
||||||
|
InnerSchedulerState::Started(sched) => sched
|
||||||
|
.boxes()
|
||||||
|
.map(|b| b.conn_state.state.connectivity.clone())
|
||||||
|
.collect(),
|
||||||
|
_ => Vec::new(),
|
||||||
|
};
|
||||||
|
*self.connectivities.lock() = stores;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get an overview of the current connectivity, and possibly more statistics.
|
/// Get an overview of the current connectivity, and possibly more statistics.
|
||||||
|
|||||||
Reference in New Issue
Block a user