diff --git a/src/accounts.rs b/src/accounts.rs index e1620526d..a379b44e1 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -271,14 +271,14 @@ impl Accounts { /// Notifies all accounts that the network may have become available. pub async fn maybe_network(&self) { for account in self.accounts.values() { - account.maybe_network().await; + account.scheduler.maybe_network().await; } } /// Notifies all accounts that the network connection may have been lost. pub async fn maybe_network_lost(&self) { for account in self.accounts.values() { - account.maybe_network_lost().await; + account.scheduler.maybe_network_lost(account).await; } } diff --git a/src/chat.rs b/src/chat.rs index 0442dea3c..9d49ec552 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -639,7 +639,10 @@ impl ChatId { context.emit_msgs_changed_without_ids(); context.set_config(Config::LastHousekeeping, None).await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; if chat.is_self_talk() { let mut msg = Message::new(Viewtype::Text); @@ -1667,7 +1670,7 @@ impl Chat { maybe_set_logging_xdc(context, msg, self.id).await?; } - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; Ok(msg.id) } } @@ -2201,7 +2204,10 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - context.emit_event(EventType::LocationChanged(Some(ContactId::SELF))); } - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(msg.id) @@ -3433,7 +3439,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .await?; curr_timestamp += 1; if create_send_msg_job(context, new_msg_id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } created_chats.push(chat_id); @@ -3488,7 +3497,10 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { msg_id: msg.id, }); if create_send_msg_job(context, msg.id).await?.is_some() { - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } } diff --git a/src/config.rs b/src/config.rs index 76dca83a2..035caf314 100644 --- a/src/config.rs +++ b/src/config.rs @@ -443,7 +443,7 @@ impl Context { Config::DeleteDeviceAfter => { let ret = self.sql.set_raw_config(key.as_ref(), value).await; // Interrupt ephemeral loop to delete old messages immediately. - self.interrupt_ephemeral_task().await; + self.scheduler.interrupt_ephemeral_task().await; ret? } Config::Displayname => { diff --git a/src/configure.rs b/src/configure.rs index 48b91a3f7..eb69ad25c 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -59,7 +59,7 @@ impl Context { /// Configures this account with the currently set parameters. pub async fn configure(&self) -> Result<()> { ensure!( - self.scheduler.read().await.is_none(), + !self.scheduler.is_running().await, "cannot configure, already running" ); ensure!( @@ -469,7 +469,9 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { ctx.set_config_bool(Config::FetchedExistingMsgs, false) .await?; - ctx.interrupt_inbox(InterruptInfo::new(false)).await; + ctx.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; progress!(ctx, 940); update_device_chats_handle.await??; diff --git a/src/contact.rs b/src/contact.rs index 1d00a80f0..07430d308 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1466,7 +1466,10 @@ pub(crate) async fn update_last_seen( > 0 && timestamp > time() - SEEN_RECENTLY_SECONDS { - context.interrupt_recently_seen(contact_id, timestamp).await; + context + .scheduler + .interrupt_recently_seen(contact_id, timestamp) + .await; } Ok(()) } diff --git a/src/context.rs b/src/context.rs index 13479a7ea..9c2eb33e2 100644 --- a/src/context.rs +++ b/src/context.rs @@ -24,7 +24,7 @@ use crate::key::{DcKey, SignedPublicKey}; use crate::login_param::LoginParam; use crate::message::{self, MessageState, MsgId}; use crate::quota::QuotaInfo; -use crate::scheduler::Scheduler; +use crate::scheduler::{IoPausedGuard, SchedulerState}; use crate::sql::Sql; use crate::stock_str::StockStrings; use crate::timesmearing::SmearedTimestamp; @@ -201,7 +201,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: StockStrings, pub(crate) events: Events, - pub(crate) scheduler: RwLock>, + pub(crate) scheduler: SchedulerState, pub(crate) ratelimit: RwLock, /// Recently loaded quota information, if any. @@ -370,7 +370,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: stockstrings, events, - scheduler: RwLock::new(None), + scheduler: SchedulerState::new(), ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow to send 6 messages immediately, no more than once every 10 seconds. quota: RwLock::new(None), quota_update_request: AtomicBool::new(false), @@ -395,42 +395,33 @@ impl Context { warn!(self, "can not start io on a context that is not configured"); return; } - - info!(self, "starting IO"); - let mut lock = self.inner.scheduler.write().await; - if lock.is_none() { - match Scheduler::start(self.clone()).await { - Err(err) => error!(self, "Failed to start IO: {:#}", err), - Ok(scheduler) => *lock = Some(scheduler), - } - } + self.scheduler.start(self.clone()).await; } /// Stops the IO scheduler. pub async fn stop_io(&self) { - // Sending an event wakes up event pollers (get_next_event) - // so the caller of stop_io() can arrange for proper termination. - // For this, the caller needs to instruct the event poller - // to terminate on receiving the next event and then call stop_io() - // which will emit the below event(s) - info!(self, "stopping IO"); - if let Some(debug_logging) = self.debug_logging.read().await.as_ref() { - debug_logging.loop_handle.abort(); - } - if let Some(scheduler) = self.inner.scheduler.write().await.take() { - scheduler.stop(self).await; - } + self.scheduler.stop(self).await; } /// Restarts the IO scheduler if it was running before /// when it is not running this is an no-op pub async fn restart_io_if_running(&self) { - info!(self, "restarting IO"); - let is_running = { self.inner.scheduler.read().await.is_some() }; - if is_running { - self.stop_io().await; - self.start_io().await; - } + self.scheduler.restart(self).await; + } + + /// Pauses the IO scheduler. + /// + /// This temporarily pauses the IO scheduler and will make sure calls to + /// [`Context::start_io`] are no-ops while being paused. + /// + /// It is recommended to call [`IoPausedGuard::resume`] rather than simply dropping it. + pub(crate) async fn pause_io(&self) -> IoPausedGuard<'_> { + self.scheduler.pause(self).await + } + + /// Indicate that the network likely has come back. + pub async fn maybe_network(&self) { + self.scheduler.maybe_network().await; } /// Returns a reference to the underlying SQL instance. diff --git a/src/ephemeral.rs b/src/ephemeral.rs index f7dadf5d8..75fb1c127 100644 --- a/src/ephemeral.rs +++ b/src/ephemeral.rs @@ -317,7 +317,7 @@ impl MsgId { paramsv![ephemeral_timestamp, ephemeral_timestamp, self], ) .await?; - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } @@ -345,7 +345,7 @@ pub(crate) async fn start_ephemeral_timers_msgids( ) .await?; if count > 0 { - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } Ok(()) } diff --git a/src/imap.rs b/src/imap.rs index 91f6ae9cc..6e3437fc0 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -475,7 +475,7 @@ impl Imap { // Note that the `Config::DeleteDeviceAfter` timer starts as soon as the messages are // fetched while the per-chat ephemeral timers start as soon as the messages are marked // as noticed. - context.interrupt_ephemeral_task().await; + context.scheduler.interrupt_ephemeral_task().await; } let session = self @@ -2224,7 +2224,10 @@ pub(crate) async fn markseen_on_imap_table(context: &Context, message_id: &str) paramsv![message_id], ) .await?; - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/imex.rs b/src/imex.rs index 7ca3bb15c..af02ba1d8 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -86,13 +86,17 @@ pub async fn imex( ) -> Result<()> { let cancel = context.alloc_ongoing().await?; - let res = imex_inner(context, what, path, passphrase) - .race(async { - cancel.recv().await.ok(); - Err(format_err!("canceled")) - }) - .await; - + let res = { + let mut guard = context.pause_io().await; + let res = imex_inner(context, what, path, passphrase) + .race(async { + cancel.recv().await.ok(); + Err(format_err!("canceled")) + }) + .await; + guard.resume().await; + res + }; context.free_ongoing().await; if let Err(err) = res.as_ref() { @@ -413,7 +417,7 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot import backup, IO is running" ); @@ -523,7 +527,7 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res sql::housekeeping(context).await.ok_or_log(context); ensure!( - context.scheduler.read().await.is_none(), + !context.scheduler.is_running().await, "cannot export backup, IO is running" ); diff --git a/src/job.rs b/src/job.rs index 25f3814a2..029272d17 100644 --- a/src/job.rs +++ b/src/job.rs @@ -238,6 +238,7 @@ fn get_backoff_time_offset(tries: u32) -> i64 { pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { context.resync_request.store(true, Ordering::Relaxed); context + .scheduler .interrupt_inbox(InterruptInfo { probe_network: false, }) @@ -250,7 +251,10 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { job.save(context).await.context("failed to save job")?; info!(context, "interrupt: imap"); - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } diff --git a/src/location.rs b/src/location.rs index 52315585b..375c40662 100644 --- a/src/location.rs +++ b/src/location.rs @@ -267,7 +267,7 @@ pub async fn send_locations_to_chat( } context.emit_event(EventType::ChatModified(chat_id)); if 0 != seconds { - context.interrupt_location().await; + context.scheduler.interrupt_location().await; } Ok(()) } diff --git a/src/message.rs b/src/message.rs index b287e6c57..415618f08 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1419,7 +1419,10 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { } // Interrupt Inbox loop to start message deletion and run housekeeping. - context.interrupt_inbox(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; Ok(()) } @@ -1531,7 +1534,10 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> ) .await .context("failed to insert into smtp_mdns")?; - context.interrupt_smtp(InterruptInfo::new(false)).await; + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/quota.rs b/src/quota.rs index b4a2be225..f192fd1d7 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -115,7 +115,9 @@ impl Context { let requested = self.quota_update_request.swap(true, Ordering::Relaxed); if !requested { // Quota update was not requested before. - self.interrupt_inbox(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_inbox(InterruptInfo::new(false)) + .await; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index daa542940..3970200b6 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; +use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -23,6 +24,208 @@ use crate::tools::{duration_to_str, maybe_add_time_based_warnings}; pub(crate) mod connectivity; +/// State of the IO scheduler, as stored on the [`Context`]. +/// +/// The IO scheduler can be stopped or started, but core can also pause it. After pausing +/// the IO scheduler will be restarted only if it was running before paused or +/// [`Context::start_io`] was called in the meantime while it was paused. +#[derive(Debug, Default)] +pub(crate) struct SchedulerState { + inner: RwLock, +} + +impl SchedulerState { + pub(crate) fn new() -> Self { + Default::default() + } + + /// Whether the scheduler is currently running. + pub(crate) async fn is_running(&self) -> bool { + let inner = self.inner.read().await; + inner.scheduler.is_some() + } + + /// Starts the scheduler if it is not yet started. + pub(crate) async fn start(&self, context: Context) { + let mut inner = self.inner.write().await; + inner.started = true; + if inner.scheduler.is_none() { + Self::do_start(inner, context).await; + } + } + + /// Starts the scheduler if it is not yet started. + async fn do_start(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: Context) { + info!(context, "starting IO"); + let ctx = context.clone(); + match Scheduler::start(context).await { + Ok(scheduler) => inner.scheduler = Some(scheduler), + Err(err) => error!(&ctx, "Failed to start IO: {:#}", err), + } + } + + /// Stops the scheduler if it is currently running. + pub(crate) async fn stop(&self, context: &Context) { + let mut inner = self.inner.write().await; + inner.started = false; + Self::do_stop(inner, context).await; + } + + /// Stops the scheduler if it is currently running. + async fn do_stop(mut inner: RwLockWriteGuard<'_, InnerSchedulerState>, context: &Context) { + // Sending an event wakes up event pollers (get_next_event) + // so the caller of stop_io() can arrange for proper termination. + // For this, the caller needs to instruct the event poller + // to terminate on receiving the next event and then call stop_io() + // which will emit the below event(s) + info!(context, "stopping IO"); + if let Some(debug_logging) = context.debug_logging.read().await.as_ref() { + debug_logging.loop_handle.abort(); + } + if let Some(scheduler) = inner.scheduler.take() { + scheduler.stop(context).await; + } + } + + /// Pauses the scheduler. + /// + /// If it is currently running the scheduler will be stopped. When + /// [`IoPausedGuard::resume`] is called the scheduler is started again. If in the + /// meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called resume will + /// do the right thing. + pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, context).await; + IoPausedGuard { + context, + done: false, + } + } + + /// Restarts the scheduler, only if it is running. + pub(crate) async fn restart(&self, context: &Context) { + info!(context, "restarting IO"); + if self.is_running().await { + self.stop(context).await; + self.start(context.clone()).await; + } + } + + /// Indicate that the network likely has come back. + pub(crate) async fn maybe_network(&self) { + let inner = self.inner.read().await; + let (inbox, oboxes) = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network(); + let inbox = scheduler.inbox.conn_state.state.connectivity.clone(); + let oboxes = scheduler + .oboxes + .iter() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect::>(); + (inbox, oboxes) + } + None => return, + }; + drop(inner); + // TODO: maybe this called code should move into scheduler.maybe_network() instead? + connectivity::idle_interrupted(inbox, oboxes).await; + } + + /// Indicate that the network likely is lost. + pub(crate) async fn maybe_network_lost(&self, context: &Context) { + let inner = self.inner.read().await; + let stores = match inner.scheduler { + Some(ref scheduler) => { + scheduler.maybe_network_lost(); + scheduler + .boxes() + .map(|b| b.conn_state.state.connectivity.clone()) + .collect() + } + None => return, + }; + drop(inner); + // TODO; maybe this called code should move into scheduler.maybe_network_lost() + // instead? + connectivity::maybe_network_lost(context, stores).await; + } + + pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_inbox(info); + } + } + + pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_smtp(info); + } + } + + pub(crate) async fn interrupt_ephemeral_task(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_ephemeral_task(); + } + } + + pub(crate) async fn interrupt_location(&self) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_location(); + } + } + + pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { + let inner = self.inner.read().await; + if let Some(ref scheduler) = inner.scheduler { + scheduler.interrupt_recently_seen(contact_id, timestamp); + } + } +} + +#[derive(Debug, Default)] +struct InnerSchedulerState { + scheduler: Option, + started: bool, + paused: bool, +} + +#[derive(Debug)] +pub(crate) struct IoPausedGuard<'a> { + context: &'a Context, + done: bool, +} + +impl<'a> IoPausedGuard<'a> { + pub(crate) async fn resume(&mut self) { + self.done = true; + let inner = self.context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, self.context.clone()).await; + } + } +} + +impl<'a> Drop for IoPausedGuard<'a> { + fn drop(&mut self) { + if self.done { + return; + } + let context = self.context.clone(); + tokio::spawn(async move { + let inner = context.scheduler.inner.write().await; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, context.clone()).await; + } + }); + } +} + #[derive(Debug)] struct SchedBox { meaning: FolderMeaning, @@ -46,56 +249,6 @@ pub(crate) struct Scheduler { recently_seen_loop: RecentlySeenLoop, } -impl Context { - /// Indicate that the network likely has come back. - pub async fn maybe_network(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network(); - } - connectivity::idle_interrupted(lock).await; - } - - /// Indicate that the network likely is lost. - pub async fn maybe_network_lost(&self) { - let lock = self.scheduler.read().await; - if let Some(scheduler) = &*lock { - scheduler.maybe_network_lost(); - } - connectivity::maybe_network_lost(self, lock).await; - } - - pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_inbox(info); - } - } - - pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_smtp(info); - } - } - - pub(crate) async fn interrupt_ephemeral_task(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_ephemeral_task(); - } - } - - pub(crate) async fn interrupt_location(&self) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_location(); - } - } - - pub(crate) async fn interrupt_recently_seen(&self, contact_id: ContactId, timestamp: i64) { - if let Some(scheduler) = &*self.scheduler.read().await { - scheduler.interrupt_recently_seen(contact_id, timestamp); - } - } -} - async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers) { use futures::future::FutureExt; diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 6ede2074f..91edcad61 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -3,7 +3,7 @@ use std::{iter::once, ops::Deref, sync::Arc}; use anyhow::{anyhow, Result}; use humansize::{format_size, BINARY}; -use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio::sync::Mutex; use crate::events::EventType; use crate::imap::{scan_folders::get_watched_folder_configs, FolderMeaning}; @@ -12,7 +12,7 @@ use crate::quota::{ }; use crate::tools::time; use crate::{context::Context, log::LogExt}; -use crate::{scheduler::Scheduler, stock_str, tools}; +use crate::{stock_str, tools}; #[derive(Debug, Clone, Copy, PartialEq, Eq, EnumProperty, PartialOrd, Ord)] pub enum Connectivity { @@ -156,19 +156,7 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { - let (inbox, oboxes) = match &*scheduler { - Some(Scheduler { inbox, oboxes, .. }) => ( - inbox.conn_state.state.connectivity.clone(), - oboxes - .iter() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect::>(), - ), - None => return, - }; - drop(scheduler); - +pub(crate) async fn idle_interrupted(inbox: ConnectivityStore, oboxes: Vec) { let mut connectivity_lock = inbox.0.lock().await; // For the inbox, we also have to set the connectivity to InterruptingIdle if it was // NotConfigured before: If all folders are NotConfigured, dc_get_connectivity() @@ -195,19 +183,7 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>, -) { - let stores: Vec<_> = match &*scheduler { - Some(sched) => sched - .boxes() - .map(|b| b.conn_state.state.connectivity.clone()) - .collect(), - None => return, - }; - drop(scheduler); - +pub(crate) async fn maybe_network_lost(context: &Context, stores: Vec) { for store in &stores { let mut connectivity_lock = store.0.lock().await; if !matches!( @@ -249,9 +225,9 @@ impl Context { /// /// If the connectivity changes, a DC_EVENT_CONNECTIVITY_CHANGED will be emitted. pub async fn get_connectivity(&self) -> Connectivity { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| b.conn_state.state.connectivity.clone()) .collect(), @@ -332,9 +308,9 @@ impl Context { // Get the states from the RwLock // ============================================================================================= - let lock = self.scheduler.read().await; - let (folders_states, smtp) = match &*lock { - Some(sched) => ( + let lock = self.scheduler.inner.read().await; + let (folders_states, smtp) = match lock.scheduler { + Some(ref sched) => ( sched .boxes() .map(|b| (b.meaning, b.conn_state.state.connectivity.clone())) @@ -503,9 +479,9 @@ impl Context { /// Returns true if all background work is done. pub async fn all_work_done(&self) -> bool { - let lock = self.scheduler.read().await; - let stores: Vec<_> = match &*lock { - Some(sched) => sched + let lock = self.scheduler.inner.read().await; + let stores: Vec<_> = match lock.scheduler { + Some(ref sched) => sched .boxes() .map(|b| &b.conn_state.state) .chain(once(&sched.smtp.state)) diff --git a/src/webxdc.rs b/src/webxdc.rs index bfaaa8741..d5873df55 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -421,7 +421,9 @@ impl Context { DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", paramsv![instance.id, status_update_serial, status_update_serial, descr], ).await?; - self.interrupt_smtp(InterruptInfo::new(false)).await; + self.scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; } Ok(()) }