diff --git a/CHANGELOG.md b/CHANGELOG.md index 249f9866a..c55d86191 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Changes - send normal messages with higher priority than MDNs #3243 +- make Scheduler stateless #3302 ## 1.80.0 diff --git a/src/configure.rs b/src/configure.rs index 99f008bae..b887a77c5 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -60,7 +60,7 @@ impl Context { use futures::future::FutureExt; ensure!( - !self.scheduler.read().await.is_running(), + self.scheduler.read().await.is_none(), "cannot configure, already running" ); ensure!( diff --git a/src/context.rs b/src/context.rs index f37341b5e..152095aa8 100644 --- a/src/context.rs +++ b/src/context.rs @@ -54,7 +54,7 @@ pub struct InnerContext { pub(crate) translated_stockstrings: RwLock>, pub(crate) events: Events, - pub(crate) scheduler: RwLock, + pub(crate) scheduler: RwLock>, /// Recently loaded quota information, if any. /// Set to `None` if quota was never tried to load. @@ -173,7 +173,7 @@ impl Context { wrong_pw_warning_mutex: Mutex::new(()), translated_stockstrings: RwLock::new(HashMap::new()), events: Events::default(), - scheduler: RwLock::new(Scheduler::Stopped), + scheduler: RwLock::new(None), quota: RwLock::new(None), creation_time: std::time::SystemTime::now(), last_full_folder_scan: Mutex::new(None), @@ -195,8 +195,12 @@ impl Context { } info!(self, "starting IO"); - if let Err(err) = self.inner.scheduler.write().await.start(self.clone()).await { - error!(self, "Failed to start IO: {}", err) + let mut lock = self.inner.scheduler.write().await; + if lock.is_none() { + match Scheduler::start(self.clone()).await { + Err(err) => error!(self, "Failed to start IO: {}", err), + Ok(scheduler) => *lock = Some(scheduler), + } } } @@ -209,8 +213,8 @@ impl Context { // which will emit the below event(s) info!(self, "stopping IO"); - if let Err(err) = self.inner.stop_io().await { - warn!(self, "failed to stop IO: {}", err); + if let Some(scheduler) = self.inner.scheduler.write().await.take() { + scheduler.stop(self).await; } } @@ -642,13 +646,6 @@ impl Context { } } -impl InnerContext { - async fn stop_io(&self) -> Result<()> { - self.scheduler.write().await.stop().await?; - Ok(()) - } -} - impl Default for RunningState { fn default() -> Self { RunningState { diff --git a/src/imex.rs b/src/imex.rs index e973dbef4..117924a17 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -445,8 +445,8 @@ async fn import_backup( "Cannot import backups to accounts in use." ); ensure!( - !context.scheduler.read().await.is_running(), - "cannot import backup, IO already running" + context.scheduler.read().await.is_none(), + "cannot import backup, IO is running" ); let backup_file = File::open(backup_to_import).await?; @@ -563,8 +563,8 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res .ok(); ensure!( - !context.scheduler.read().await.is_running(), - "cannot export backup, IO already running" + context.scheduler.read().await.is_none(), + "cannot export backup, IO is running" ); info!( diff --git a/src/scheduler.rs b/src/scheduler.rs index d0195c828..655f867da 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,7 +2,7 @@ use anyhow::{bail, Context as _, Result}; use async_std::prelude::*; use async_std::{ channel::{self, Receiver, Sender}, - task, + future, task, }; use crate::config::Config; @@ -23,54 +23,62 @@ pub(crate) mod connectivity; /// Job and connection scheduler. #[derive(Debug)] -#[allow(clippy::large_enum_variant)] -pub(crate) enum Scheduler { - Stopped, - Running { - inbox: ImapConnectionState, - inbox_handle: Option>, - mvbox: ImapConnectionState, - mvbox_handle: Option>, - sentbox: ImapConnectionState, - sentbox_handle: Option>, - smtp: SmtpConnectionState, - smtp_handle: Option>, - ephemeral_handle: Option>, - ephemeral_interrupt_send: Sender<()>, - location_handle: Option>, - location_interrupt_send: Sender<()>, - }, +pub(crate) struct Scheduler { + inbox: ImapConnectionState, + inbox_handle: task::JoinHandle<()>, + mvbox: ImapConnectionState, + mvbox_handle: Option>, + sentbox: ImapConnectionState, + sentbox_handle: Option>, + smtp: SmtpConnectionState, + smtp_handle: task::JoinHandle<()>, + ephemeral_handle: task::JoinHandle<()>, + ephemeral_interrupt_send: Sender<()>, + location_handle: task::JoinHandle<()>, + location_interrupt_send: Sender<()>, } impl Context { /// Indicate that the network likely has come back. pub async fn maybe_network(&self) { let lock = self.scheduler.read().await; - lock.maybe_network().await; + if let Some(scheduler) = &*lock { + scheduler.maybe_network().await; + } connectivity::idle_interrupted(lock).await; } /// Indicate that the network likely is lost. pub async fn maybe_network_lost(&self) { let lock = self.scheduler.read().await; - lock.maybe_network_lost().await; + if let Some(scheduler) = &*lock { + scheduler.maybe_network_lost().await; + } connectivity::maybe_network_lost(self, lock).await; } pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { - self.scheduler.read().await.interrupt_inbox(info).await; + if let Some(scheduler) = &*self.scheduler.read().await { + scheduler.interrupt_inbox(info).await; + } } pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { - self.scheduler.read().await.interrupt_smtp(info).await; + if let Some(scheduler) = &*self.scheduler.read().await { + scheduler.interrupt_smtp(info).await; + } } pub(crate) async fn interrupt_ephemeral_task(&self) { - self.scheduler.read().await.interrupt_ephemeral_task().await; + if let Some(scheduler) = &*self.scheduler.read().await { + scheduler.interrupt_ephemeral_task().await; + } } pub(crate) async fn interrupt_location(&self) { - self.scheduler.read().await.interrupt_location().await; + if let Some(scheduler) = &*self.scheduler.read().await { + scheduler.interrupt_location().await; + } } } @@ -356,12 +364,8 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect } impl Scheduler { - /// Start the scheduler, returns error if it is already running. - pub async fn start(&mut self, ctx: Context) -> Result<()> { - if self.is_running() { - bail!("scheduler is already started"); - } - + /// Start the scheduler. + pub async fn start(ctx: Context) -> Result { let (mvbox, mvbox_handlers) = ImapConnectionState::new(&ctx).await?; let (sentbox, sentbox_handlers) = ImapConnectionState::new(&ctx).await?; let (smtp, smtp_handlers) = SmtpConnectionState::new(); @@ -378,9 +382,7 @@ impl Scheduler { let inbox_handle = { let ctx = ctx.clone(); - Some(task::spawn(async move { - inbox_loop(ctx, inbox_start_send, inbox_handlers).await - })) + task::spawn(async move { inbox_loop(ctx, inbox_start_send, inbox_handlers).await }) }; if ctx.should_watch_mvbox().await? { @@ -431,26 +433,24 @@ impl Scheduler { let smtp_handle = { let ctx = ctx.clone(); - Some(task::spawn(async move { - smtp_loop(ctx, smtp_start_send, smtp_handlers).await - })) + task::spawn(async move { smtp_loop(ctx, smtp_start_send, smtp_handlers).await }) }; let ephemeral_handle = { let ctx = ctx.clone(); - Some(task::spawn(async move { + task::spawn(async move { ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await; - })) + }) }; let location_handle = { let ctx = ctx.clone(); - Some(task::spawn(async move { + task::spawn(async move { location::location_loop(&ctx, location_interrupt_recv).await; - })) + }) }; - *self = Scheduler::Running { + let res = Self { inbox, mvbox, sentbox, @@ -477,14 +477,10 @@ impl Scheduler { } info!(ctx, "scheduler is running"); - Ok(()) + Ok(res) } async fn maybe_network(&self) { - if !self.is_running() { - return; - } - self.interrupt_inbox(InterruptInfo::new(true)) .join(self.interrupt_mvbox(InterruptInfo::new(true))) .join(self.interrupt_sentbox(InterruptInfo::new(true))) @@ -493,10 +489,6 @@ impl Scheduler { } async fn maybe_network_lost(&self) { - if !self.is_running() { - return; - } - self.interrupt_inbox(InterruptInfo::new(false)) .join(self.interrupt_mvbox(InterruptInfo::new(false))) .join(self.interrupt_sentbox(InterruptInfo::new(false))) @@ -505,109 +497,64 @@ impl Scheduler { } async fn interrupt_inbox(&self, info: InterruptInfo) { - if let Scheduler::Running { ref inbox, .. } = self { - inbox.interrupt(info).await; - } + self.inbox.interrupt(info).await; } async fn interrupt_mvbox(&self, info: InterruptInfo) { - if let Scheduler::Running { ref mvbox, .. } = self { - mvbox.interrupt(info).await; - } + self.mvbox.interrupt(info).await; } async fn interrupt_sentbox(&self, info: InterruptInfo) { - if let Scheduler::Running { ref sentbox, .. } = self { - sentbox.interrupt(info).await; - } + self.sentbox.interrupt(info).await; } async fn interrupt_smtp(&self, info: InterruptInfo) { - if let Scheduler::Running { ref smtp, .. } = self { - smtp.interrupt(info).await; - } + self.smtp.interrupt(info).await; } async fn interrupt_ephemeral_task(&self) { - if let Scheduler::Running { - ref ephemeral_interrupt_send, - .. - } = self - { - ephemeral_interrupt_send.try_send(()).ok(); - } + self.ephemeral_interrupt_send.try_send(()).ok(); } async fn interrupt_location(&self) { - if let Scheduler::Running { - ref location_interrupt_send, - .. - } = self - { - location_interrupt_send.try_send(()).ok(); - } + self.location_interrupt_send.try_send(()).ok(); } /// Halt the scheduler. - pub(crate) async fn stop(&mut self) -> Result<()> { - match self { - Scheduler::Stopped => { - bail!("scheduler is already stopped"); - } - Scheduler::Running { - inbox, - inbox_handle, - mvbox, - mvbox_handle, - sentbox, - sentbox_handle, - smtp, - smtp_handle, - ephemeral_handle, - location_handle, - .. - } => { - if inbox_handle.is_some() { - inbox.stop().await?; - } - if mvbox_handle.is_some() { - mvbox.stop().await?; - } - if sentbox_handle.is_some() { - sentbox.stop().await?; - } - if smtp_handle.is_some() { - smtp.stop().await?; - } - - if let Some(handle) = inbox_handle.take() { - handle.await; - } - if let Some(handle) = mvbox_handle.take() { - handle.await; - } - if let Some(handle) = sentbox_handle.take() { - handle.await; - } - if let Some(handle) = smtp_handle.take() { - handle.await; - } - if let Some(handle) = ephemeral_handle.take() { - handle.cancel().await; - } - if let Some(handle) = location_handle.take() { - handle.cancel().await; - } - - *self = Scheduler::Stopped; - Ok(()) - } + /// + /// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks + /// are forcefully terminated if they cannot shutdown within the timeout. + pub(crate) async fn stop(mut self, context: &Context) { + // Send stop signals to tasks so they can shutdown cleanly. + self.inbox.stop().await.ok_or_log(context); + if self.mvbox_handle.is_some() { + self.mvbox.stop().await.ok_or_log(context); } - } + if self.sentbox_handle.is_some() { + self.sentbox.stop().await.ok_or_log(context); + } + self.smtp.stop().await.ok_or_log(context); - /// Check if the scheduler is running. - pub fn is_running(&self) -> bool { - matches!(self, Scheduler::Running { .. }) + // Actually shutdown tasks. + let timeout_duration = std::time::Duration::from_secs(30); + future::timeout(timeout_duration, self.inbox_handle) + .await + .ok_or_log(context); + if let Some(mvbox_handle) = self.mvbox_handle.take() { + future::timeout(timeout_duration, mvbox_handle) + .await + .ok_or_log(context); + } + if let Some(sentbox_handle) = self.sentbox_handle.take() { + future::timeout(timeout_duration, sentbox_handle) + .await + .ok_or_log(context); + } + future::timeout(timeout_duration, self.smtp_handle) + .await + .ok_or_log(context); + self.ephemeral_handle.cancel().await; + self.location_handle.cancel().await; } } diff --git a/src/scheduler/connectivity.rs b/src/scheduler/connectivity.rs index 5ea221a62..8512c1fe8 100644 --- a/src/scheduler/connectivity.rs +++ b/src/scheduler/connectivity.rs @@ -161,19 +161,19 @@ impl ConnectivityStore { /// Set all folder states to InterruptingIdle in case they were `Connected` before. /// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()` /// returns false immediately after `dc_maybe_network()`. -pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Scheduler>) { +pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option>) { let [inbox, mvbox, sentbox] = match &*scheduler { - Scheduler::Running { + Some(Scheduler { inbox, mvbox, sentbox, .. - } => [ + }) => [ inbox.state.connectivity.clone(), mvbox.state.connectivity.clone(), sentbox.state.connectivity.clone(), ], - Scheduler::Stopped => return, + None => return, }; drop(scheduler); @@ -205,20 +205,20 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Scheduler>) /// after `maybe_network_lost()` was called. pub(crate) async fn maybe_network_lost( context: &Context, - scheduler: RwLockReadGuard<'_, Scheduler>, + scheduler: RwLockReadGuard<'_, Option>, ) { let stores = match &*scheduler { - Scheduler::Running { + Some(Scheduler { inbox, mvbox, sentbox, .. - } => [ + }) => [ inbox.state.connectivity.clone(), mvbox.state.connectivity.clone(), sentbox.state.connectivity.clone(), ], - Scheduler::Stopped => return, + None => return, }; drop(scheduler); @@ -265,16 +265,16 @@ impl Context { pub async fn get_connectivity(&self) -> Connectivity { let lock = self.scheduler.read().await; let stores: Vec<_> = match &*lock { - Scheduler::Running { + Some(Scheduler { inbox, mvbox, sentbox, .. - } => [&inbox.state, &mvbox.state, &sentbox.state] + }) => [&inbox.state, &mvbox.state, &sentbox.state] .iter() .map(|state| state.connectivity.clone()) .collect(), - Scheduler::Stopped => return Connectivity::NotConnected, + None => return Connectivity::NotConnected, }; drop(lock); @@ -353,13 +353,13 @@ impl Context { let lock = self.scheduler.read().await; let (folders_states, smtp) = match &*lock { - Scheduler::Running { + Some(Scheduler { inbox, mvbox, sentbox, smtp, .. - } => ( + }) => ( [ ( Config::ConfiguredInboxFolder, @@ -376,7 +376,7 @@ impl Context { ], smtp.state.connectivity.clone(), ), - Scheduler::Stopped => { + None => { return Err(anyhow!("Not started")); } }; @@ -552,17 +552,17 @@ impl Context { pub async fn all_work_done(&self) -> bool { let lock = self.scheduler.read().await; let stores: Vec<_> = match &*lock { - Scheduler::Running { + Some(Scheduler { inbox, mvbox, sentbox, smtp, .. - } => [&inbox.state, &mvbox.state, &sentbox.state, &smtp.state] + }) => [&inbox.state, &mvbox.state, &sentbox.state, &smtp.state] .iter() .map(|state| state.connectivity.clone()) .collect(), - Scheduler::Stopped => return false, + None => return false, }; drop(lock);