diff --git a/src/config.rs b/src/config.rs index 6cd1705b6..7bfebd868 100644 --- a/src/config.rs +++ b/src/config.rs @@ -734,7 +734,7 @@ impl Context { Self::check_config(key, value)?; let _pause = match key.needs_io_restart() { - true => self.scheduler.pause(self.clone()).await?, + true => self.scheduler.pause(self).await?, _ => Default::default(), }; self.set_config_internal(key, value).await?; diff --git a/src/context.rs b/src/context.rs index ca964deb1..0272a74c0 100644 --- a/src/context.rs +++ b/src/context.rs @@ -34,7 +34,7 @@ use crate::param::{Param, Params}; use crate::peer_channels::Iroh; use crate::push::PushSubscriber; use crate::quota::QuotaInfo; -use crate::scheduler::{SchedulerState, convert_folder_meaning}; +use crate::scheduler::{ConnectivityStore, SchedulerState, convert_folder_meaning}; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -304,6 +304,10 @@ pub struct InnerContext { /// tokio::sync::OnceCell would be possible to use, but overkill for our usecase; /// the standard library's OnceLock is enough, and it's a lot smaller in memory. pub(crate) self_fingerprint: OnceLock, + + /// `Connectivity` values for mailboxes, unordered. Used to compute the aggregate connectivity, + /// see [`Context::get_connectivity()`]. + pub(crate) connectivities: parking_lot::Mutex>, } /// The state of ongoing process. @@ -473,6 +477,7 @@ impl Context { push_subscribed: AtomicBool::new(false), iroh: Arc::new(RwLock::new(None)), self_fingerprint: OnceLock::new(), + connectivities: parking_lot::Mutex::new(Vec::new()), }; let ctx = Context { @@ -502,7 +507,7 @@ impl Context { // Now, some configs may have changed, so, we need to invalidate the cache. self.sql.config_cache.write().await.clear(); - self.scheduler.start(self.clone()).await; + self.scheduler.start(self).await; } /// Stops the IO scheduler. @@ -579,7 +584,7 @@ impl Context { } else { // Pause the scheduler to ensure another connection does not start // while we are fetching on a dedicated connection. - let _pause_guard = self.scheduler.pause(self.clone()).await?; + let _pause_guard = self.scheduler.pause(self).await?; // Start a new dedicated connection. let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?; diff --git a/src/imex.rs b/src/imex.rs index 179db071e..3ccdc4615 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -90,7 +90,7 @@ pub async fn imex( let cancel = context.alloc_ongoing().await?; let res = { - let _guard = context.scheduler.pause(context.clone()).await?; + let _guard = context.scheduler.pause(context).await?; imex_inner(context, what, path, passphrase) .race(async { cancel.recv().await.ok(); diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 6d1cb0a72..bbc4b1de0 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -105,7 +105,7 @@ impl BackupProvider { // Acquire global "ongoing" mutex. let cancel_token = context.alloc_ongoing().await?; - let paused_guard = context.scheduler.pause(context.clone()).await?; + let paused_guard = context.scheduler.pause(context).await?; let context_dir = context .get_blobdir() .parent() diff --git a/src/scheduler.rs b/src/scheduler.rs index 491840daf..36c0d8448 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -8,12 +8,12 @@ use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; use rand::Rng; -use tokio::sync::{RwLock, RwLockWriteGuard, oneshot}; +use tokio::sync::{RwLock, oneshot}; use tokio::task; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use self::connectivity::ConnectivityStore; +pub(crate) use self::connectivity::ConnectivityStore; use crate::config::{self, Config}; use crate::constants; use crate::contact::{ContactId, RecentlySeenLoop}; @@ -53,32 +53,32 @@ impl SchedulerState { } /// Starts the scheduler if it is not yet started. - pub(crate) async fn start(&self, context: Context) { + pub(crate) async fn start(&self, context: &Context) { let mut inner = self.inner.write().await; match *inner { InnerSchedulerState::Started(_) => (), - InnerSchedulerState::Stopped => Self::do_start(inner, context).await, + InnerSchedulerState::Stopped => Self::do_start(&mut inner, context).await, InnerSchedulerState::Paused { ref mut started, .. } => *started = true, } + context.update_connectivities(&inner); } /// Starts the scheduler if it is not yet started. - async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + async fn do_start(inner: &mut InnerSchedulerState, context: &Context) { info!(context, "starting IO"); // Notify message processing loop // to allow processing old messages after restart. context.new_msgs_notify.notify_one(); - let ctx = context.clone(); - match Scheduler::start(&context).await { + match Scheduler::start(context).await { Ok(scheduler) => { *inner = InnerSchedulerState::Started(scheduler); context.emit_event(EventType::ConnectivityChanged); } - Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), + Err(err) => error!(context, "Failed to start IO: {:#}", err), } } @@ -87,18 +87,19 @@ impl SchedulerState { let mut inner = self.inner.write().await; match *inner { InnerSchedulerState::Started(_) => { - Self::do_stop(inner, context, InnerSchedulerState::Stopped).await + Self::do_stop(&mut inner, context, InnerSchedulerState::Stopped).await } InnerSchedulerState::Stopped => (), InnerSchedulerState::Paused { ref mut started, .. } => *started = false, } + context.update_connectivities(&inner); } /// Stops the scheduler if it is currently running. async fn do_stop( - mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, + inner: &mut InnerSchedulerState, context: &Context, new_state: InnerSchedulerState, ) { @@ -122,7 +123,7 @@ impl SchedulerState { debug_logging.loop_handle.abort(); debug_logging.loop_handle.await.ok(); } - let prev_state = std::mem::replace(&mut *inner, new_state); + let prev_state = std::mem::replace(inner, new_state); context.emit_event(EventType::ConnectivityChanged); match prev_state { InnerSchedulerState::Started(scheduler) => scheduler.stop(context).await, @@ -138,7 +139,7 @@ impl SchedulerState { /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called /// resume will do the right thing and restore the scheduler to the state requested by /// the last call. - pub(crate) async fn pause(&'_ self, context: Context) -> Result { + pub(crate) async fn pause(&'_ self, context: &Context) -> Result { { let mut inner = self.inner.write().await; match *inner { @@ -147,7 +148,7 @@ impl SchedulerState { started: true, pause_guards_count: NonZeroUsize::new(1).unwrap(), }; - Self::do_stop(inner, &context, new_state).await; + Self::do_stop(&mut inner, context, new_state).await; } InnerSchedulerState::Stopped => { *inner = InnerSchedulerState::Paused { @@ -164,9 +165,11 @@ impl SchedulerState { .ok_or_else(|| Error::msg("Too many pause guards active"))? } } + context.update_connectivities(&inner); } let (tx, rx) = oneshot::channel(); + let context = context.clone(); tokio::spawn(async move { rx.await.ok(); let mut inner = context.scheduler.inner.write().await; @@ -183,7 +186,7 @@ impl SchedulerState { } => { if *pause_guards_count == NonZeroUsize::new(1).unwrap() { match *started { - true => SchedulerState::do_start(inner, context.clone()).await, + true => SchedulerState::do_start(&mut inner, &context).await, false => *inner = InnerSchedulerState::Stopped, } } else { @@ -193,6 +196,7 @@ impl SchedulerState { } } } + context.update_connectivities(&inner); }); Ok(IoPausedGuard { sender: Some(tx) }) } @@ -202,7 +206,7 @@ impl SchedulerState { info!(context, "restarting IO"); if self.is_running().await { self.stop(context).await; - self.start(context.clone()).await; + self.start(context).await; } } @@ -288,7 +292,7 @@ impl SchedulerState { } #[derive(Debug, Default)] -enum InnerSchedulerState { +pub(crate) enum InnerSchedulerState { Started(Scheduler), #[default] Stopped, diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 8f34258ba..9fdeadd8d 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -272,16 +272,7 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.inner.read().await; - let stores: Vec<_> = match *lock { - InnerSchedulerState::Started(ref sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - _ => return Connectivity::NotConnected, - }; - drop(lock); - + let stores = self.connectivities.lock().clone(); let mut connectivities = Vec::new(); for s in stores { if let Some(connectivity) = s.get_basic().await { @@ -291,7 +282,18 @@ impl Context { connectivities .into_iter() .min() - .unwrap_or(Connectivity::Connected) + .unwrap_or(Connectivity::NotConnected) + } + + pub(crate) fn update_connectivities(&self, sched: &InnerSchedulerState) { + let stores: Vec<_> = match sched { + InnerSchedulerState::Started(sched) => sched + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect(), + _ => Vec::new(), + }; + *self.connectivities.lock() = stores; } /// Get an overview of the current connectivity, and possibly more statistics.