From 1d42e4743fac7f1e38ca3a2c6f601809c489840f Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 8 Mar 2023 14:20:42 +0100 Subject: [PATCH] Allow pausing IO scheduler from inside core To handle backups the UIs have to make sure they do stop the IO scheduler and also don't accidentally restart it while working on it. Since they have to call start_io from a bunch of locations this can be a bit difficult to manage. This introduces a mechanism for the core to pause IO for some time, which is used by the imex function. It interacts well with other calls to dc_start_io() and dc_stop_io() making sure that when resumed the scheduler will be running or not as the latest calls to them. This was a little more invasive then hoped due to the scheduler. The additional abstraction of the scheduler on the context seems a nice improvement though. --- src/accounts.rs | 4 +- src/chat.rs | 22 ++- src/config.rs | 2 +- src/configure.rs | 6 +- src/contact.rs | 5 +- src/context.rs | 51 +++---- src/ephemeral.rs | 4 +- src/imap.rs | 7 +- src/imex.rs | 22 +-- src/job.rs | 6 +- src/location.rs | 2 +- src/message.rs | 10 +- src/quota.rs | 4 +- src/scheduler.rs | 253 +++++++++++++++++++++++++++------- src/scheduler/connectivity.rs | 50 ++----- src/webxdc.rs | 4 +- 16 files changed, 305 insertions(+), 147 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index e1620526d..a379b44e1 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -271,14 +271,14 @@ impl Accounts { /// Notifies all accounts that the network may have become available. pub async fn maybe_network(&self) { for account in self.accounts.values() { - account.maybe_network().await; + account.scheduler.maybe_network().await; } } /// Notifies all accounts that the network connection may have been lost. pub async fn maybe_network_lost(&self) { for account in self.accounts.values() { - account.maybe_network_lost().await; + account.scheduler.maybe_network_lost(account).await; } } diff --git a/src/chat.rs b/src/chat.rs index 0442dea3c..9d49ec552 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -639,7 +639,10 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -1667,7 +1670,7 @@ impl Chat { maybe_set_logging_xdc(context, msg, self.id).await?; } - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(msg.id) @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, new_msg_id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } created_chats.push(chat_id); @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, msg.id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } } diff --git a/src/config.rs b/src/config.rs index 76dca83a2..035caf314 100644 --- a/src/config.rs +++ b/src/config.rs @@ -443,7 +443,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(key.as_ref(), value).await; // Interrupt ephemeral loop to delete old messages immediately. - self.interrupt_ephemeral_task().await; + self.scheduler.interrupt_ephemeral_task().await; ret? } Config::Displayname => { diff --git a/src/configure.rs b/src/configure.rs index 48b91a3f7..eb69ad25c 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -59,7 +59,7 @@ impl Context { /// Configures this account with the currently set parameters. pub async fn configure(&self) -> Result<()> { ensure!( - self.scheduler.read().await.is_none(), + !self.scheduler.is_running().await, "cannot configure, already running" ); ensure!( @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.interrupt_inbox(InterruptInfo::new(false)).await; + ctx.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/contact.rs b/src/contact.rs index 1d00a80f0..07430d308 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen( > 0 && timestamp > time() - SEEN_RECENTLY_SECONDS { - context.interrupt_recently_seen(contact_id, timestamp).await; + context + .scheduler + .interrupt_recently_seen(contact_id, timestamp) + .await; } Ok(()) } diff --git a/src/context.rs b/src/context.rs index 13479a7ea..9c2eb33e2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::Scheduler; +use crate::scheduler::{IoPausedGuard, SchedulerState}; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -201,7 +201,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, - pub(crate) scheduler: RwLock>, + pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. @@ -370,7 +370,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, - scheduler: RwLock::new(None), + scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds. quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), @@ -395,42 +395,33 @@ impl Context { warn!(self, "can not start io on a context that is not configured"); return; } - - info!(self, "starting IO"); - let mut lock = self.inner.scheduler.write().await; - if lock.is_none() { - match Scheduler::start(self.clone()).await { - Err(err) => error!(self, "Failed to start IO: {:#}", err), - Ok(scheduler) => *lock = Some(scheduler), - } - } + self.scheduler.start(self.clone()).await; } /// Stops the IO scheduler. pub async fn stop_io(&self) { - // Sending an event wakes up event pollers (get_next_event) - // so the caller of stop_io() can arrange for proper termination. - // For this, the caller needs to instruct the event poller - // to terminate on receiving the next event and then call stop_io() - // which will emit the below event(s) - info!(self, "stopping IO"); - if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { - debug_logging.loop_handle.abort(); - } - if let Some(scheduler) = self.inner.scheduler.write().await.take() { - scheduler.stop(self).await; - } + self.scheduler.stop(self).await; } /// Restarts the IO scheduler if it was running before /// when it is not running this is an no-op pub async fn restart_io_if_running(&self) { - info!(self, "restarting IO"); - let is_running = { self.inner.scheduler.read().await.is_some() }; - if is_running { - self.stop_io().await; - self.start_io().await; - } + self.scheduler.restart(self).await; + } + + /// Pauses the IO scheduler. + /// + /// This temporarily pauses the IO scheduler and will make sure calls to + /// [`Context::start_io`] are no-ops while being paused. + /// + /// It is recommended to call [`IoPausedGuard::resume`] rather than simply dropping it. + pub(crate) async fn pause_io(&self) -> IoPausedGuard<'_> { + self.scheduler.pause(self).await + } + + /// Indicate that the network likely has come back. + pub async fn maybe_network(&self) { + self.scheduler.maybe_network().await; } /// Returns a reference to the underlying SQL instance. diff --git a/src/ephemeral.rs b/src/ephemeral.rs index f7dadf5d8..75fb1c127 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -317,7 +317,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } diff --git a/src/imap.rs b/src/imap.rs index 91f6ae9cc..6e3437fc0 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -475,7 +475,7 @@ impl Imap { // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are // fetched while the per-chat ephemeral timers start as soon as the messages are marked // as noticed. - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } let session = self @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) paramsv![message_id], ) .await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/imex.rs b/src/imex.rs index 7ca3bb15c..af02ba1d8 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -86,13 +86,17 @@ pub async fn imex( ) -> Result<()> { let cancel = context.alloc_ongoing().await?; - let res = imex_inner(context, what, path, passphrase) - .race(async { - cancel.recv().await.ok(); - Err(format_err!("canceled")) - }) - .await; - + let res = { + let mut guard = context.pause_io().await; + let res = imex_inner(context, what, path, passphrase) + .race(async { + cancel.recv().await.ok(); + Err(format_err!("canceled")) + }) + .await; + guard.resume().await; + res + }; context.free_ongoing().await; if let Err(err) = res.as_ref() { @@ -413,7 +417,7 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot import backup, IO is running" ); @@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res sql::housekeeping(context).await.ok_or_log(context); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot export backup, IO is running" ); diff --git a/src/job.rs b/src/job.rs index 25f3814a2..029272d17 100644 --- a/src/job.rs +++ b/src/job.rs @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 { pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { context.resync_request.store(true, Ordering::Relaxed); context + .scheduler .interrupt_inbox(InterruptInfo { probe_network: false, }) @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { job.save(context).await.context("failed to save job")?; info!(context, "interrupt: imap"); - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/location.rs b/src/location.rs index 52315585b..375c40662 100644 --- a/src/location.rs +++ b/src/location.rs @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - context.interrupt_location().await; + context.scheduler.interrupt_location().await; } Ok(()) } diff --git a/src/message.rs b/src/message.rs index b287e6c57..415618f08 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/quota.rs b/src/quota.rs index b4a2be225..f192fd1d7 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -115,7 +115,9 @@ impl Context { let requested = self.quota_update_request.swap(true, Ordering::Relaxed); if !requested { // Quota update was not requested before. - self.interrupt_inbox(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index daa542940..3970200b6 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -23,6 +24,208 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; pub(crate) mod connectivity; +/// State of the IO scheduler, as stored on the [`Context`]. +/// +/// The IO scheduler can be stopped or started, but core can also pause it. After pausing +/// the IO scheduler will be restarted only if it was running before paused or +/// [`Context::start_io`] was called in the meantime while it was paused. +#[derive(Debug, Default)] +pub(crate) struct SchedulerState { + inner: RwLock, +} + +impl SchedulerState { + pub(crate) fn new() -> Self { + Default::default() + } + + /// Whether the scheduler is currently running. + pub(crate) async fn is_running(&self) -> bool { + let inner = self.inner.read().await; + inner.scheduler.is_some() + } + + /// Starts the scheduler if it is not yet started. + pub(crate) async fn start(&self, context: Context) { + let mut inner = self.inner.write().await; + inner.started = true; + if inner.scheduler.is_none() { + Self::do_start(inner, context).await; + } + } + + /// Starts the scheduler if it is not yet started. + async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + info!(context, "starting IO"); + let ctx = context.clone(); + match Scheduler::start(context).await { + Ok(scheduler) => inner.scheduler = Some(scheduler), + Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), + } + } + + /// Stops the scheduler if it is currently running. + pub(crate) async fn stop(&self, context: &Context) { + let mut inner = self.inner.write().await; + inner.started = false; + Self::do_stop(inner, context).await; + } + + /// Stops the scheduler if it is currently running. + async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) { + // Sending an event wakes up event pollers (get_next_event) + // so the caller of stop_io() can arrange for proper termination. + // For this, the caller needs to instruct the event poller + // to terminate on receiving the next event and then call stop_io() + // which will emit the below event(s) + info!(context, "stopping IO"); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } + if let Some(scheduler) = inner.scheduler.take() { + scheduler.stop(context).await; + } + } + + /// Pauses the scheduler. + /// + /// If it is currently running the scheduler will be stopped. When + /// [`IoPausedGuard::resume`] is called the scheduler is started again. If in the + /// meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called resume will + /// do the right thing. + pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, context).await; + IoPausedGuard { + context, + done: false, + } + } + + /// Restarts the scheduler, only if it is running. + pub(crate) async fn restart(&self, context: &Context) { + info!(context, "restarting IO"); + if self.is_running().await { + self.stop(context).await; + self.start(context.clone()).await; + } + } + + /// Indicate that the network likely has come back. + pub(crate) async fn maybe_network(&self) { + let inner = self.inner.read().await; + let (inbox, oboxes) = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network(); + let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); + let oboxes = scheduler + .oboxes + .iter() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect::>(); + (inbox, oboxes) + } + None => return, + }; + drop(inner); + // TODO: maybe this called code should move into scheduler.maybe_network() instead? + connectivity::idle_interrupted(inbox, oboxes).await; + } + + /// Indicate that the network likely is lost. + pub(crate) async fn maybe_network_lost(&self, context: &Context) { + let inner = self.inner.read().await; + let stores = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network_lost(); + scheduler + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect() + } + None => return, + }; + drop(inner); + // TODO; maybe this called code should move into scheduler.maybe_network_lost() + // instead? + connectivity::maybe_network_lost(context, stores).await; + } + + pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_inbox(info); + } + } + + pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_smtp(info); + } + } + + pub(crate) async fn interrupt_ephemeral_task(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_ephemeral_task(); + } + } + + pub(crate) async fn interrupt_location(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_location(); + } + } + + pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_recently_seen(contact_id, timestamp); + } + } +} + +#[derive(Debug, Default)] +struct InnerSchedulerState { + scheduler: Option, + started: bool, + paused: bool, +} + +#[derive(Debug)] +pub(crate) struct IoPausedGuard<'a> { + context: &'a Context, + done: bool, +} + +impl<'a> IoPausedGuard<'a> { + pub(crate) async fn resume(&mut self) { + self.done = true; + let inner = self.context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, self.context.clone()).await; + } + } +} + +impl<'a> Drop for IoPausedGuard<'a> { + fn drop(&mut self) { + if self.done { + return; + } + let context = self.context.clone(); + tokio::spawn(async move { + let inner = context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, context.clone()).await; + } + }); + } +} + #[derive(Debug)] struct SchedBox { meaning: FolderMeaning, @@ -46,56 +249,6 @@ pub(crate) struct Scheduler { recently_seen_loop: RecentlySeenLoop, } -impl Context { - /// Indicate that the network likely has come back. - pub async fn maybe_network(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network(); - } - connectivity::idle_interrupted(lock).await; - } - - /// Indicate that the network likely is lost. - pub async fn maybe_network_lost(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network_lost(); - } - connectivity::maybe_network_lost(self, lock).await; - } - - pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_inbox(info); - } - } - - pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_smtp(info); - } - } - - pub(crate) async fn interrupt_ephemeral_task(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_ephemeral_task(); - } - } - - pub(crate) async fn interrupt_location(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_location(); - } - } - - pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_recently_seen(contact_id, timestamp); - } - } -} - async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { use futures::future::FutureExt; diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 6ede2074f..91edcad61 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc}; use anyhow::{anyhow, Result}; use humansize::{format_size, BINARY}; -use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio::sync::Mutex; use crate::events::EventType; use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning}; @@ -12,7 +12,7 @@ use crate::quota::{ }; use crate::tools::time; use crate::{context::Context, log::LogExt}; -use crate::{scheduler::Scheduler, stock_str, tools}; +use crate::{stock_str, tools}; #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)] pub enum Connectivity { @@ -156,19 +156,7 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { - let (inbox, oboxes) = match &*scheduler { - Some(Scheduler { inbox, oboxes, .. }) => ( - inbox.conn_state.state.connectivity.clone(), - oboxes - .iter() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect::>(), - ), - None => return, - }; - drop(scheduler); - +pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec) { let mut connectivity_lock = inbox.0.lock().await; // For the inbox, we also have to set the connectivity to InterruptingIdle if it was // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() @@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>, -) { - let stores: Vec<_> = match &*scheduler { - Some(sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - None => return, - }; - drop(scheduler); - +pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec) { for store in &stores { let mut connectivity_lock = store.0.lock().await; if !matches!( @@ -249,9 +225,9 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| b.conn_state.state.connectivity.clone()) .collect(), @@ -332,9 +308,9 @@ impl Context { // Get the states from the RwLock // ============================================================================================= - let lock = self.scheduler.read().await; - let (folders_states, smtp) = match &*lock { - Some(sched) => ( + let lock = self.scheduler.inner.read().await; + let (folders_states, smtp) = match lock.scheduler { + Some(ref sched) => ( sched .boxes() .map(|b| (b.meaning, b.conn_state.state.connectivity.clone())) @@ -503,9 +479,9 @@ impl Context { /// Returns true if all background work is done. pub async fn all_work_done(&self) -> bool { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| &b.conn_state.state) .chain(once(&sched.smtp.state)) diff --git a/src/webxdc.rs b/src/webxdc.rs index bfaaa8741..d5873df55 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -421,7 +421,9 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", paramsv![instance.id, status_update_serial, status_update_serial, descr], ).await?; - self.interrupt_smtp(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(()) }