scheduler: make Scheduler stateless

Scheduler has no Stopped state anymore. If Scheduler exists, it is
always started. Scheduler is stopped via Scheduler.stop(), which
consumes Scheduler and cannot fail.
This commit is contained in:
link2xt
2022-05-07 12:52:42 +00:00
parent 7bcb03f1ec
commit 60d3960f3a
6 changed files with 113 additions and 168 deletions

View File

@@ -4,6 +4,7 @@
### Changes
- send normal messages with higher priority than MDNs #3243
- make Scheduler stateless #3302
## 1.80.0

View File

@@ -60,7 +60,7 @@ impl Context {
use futures::future::FutureExt;
ensure!(
!self.scheduler.read().await.is_running(),
self.scheduler.read().await.is_none(),
"cannot configure, already running"
);
ensure!(

View File

@@ -54,7 +54,7 @@ pub struct InnerContext {
pub(crate) translated_stockstrings: RwLock<HashMap<usize, String>>,
pub(crate) events: Events,
pub(crate) scheduler: RwLock<Scheduler>,
pub(crate) scheduler: RwLock<Option<Scheduler>>,
/// Recently loaded quota information, if any.
/// Set to `None` if quota was never tried to load.
@@ -173,7 +173,7 @@ impl Context {
wrong_pw_warning_mutex: Mutex::new(()),
translated_stockstrings: RwLock::new(HashMap::new()),
events: Events::default(),
scheduler: RwLock::new(Scheduler::Stopped),
scheduler: RwLock::new(None),
quota: RwLock::new(None),
creation_time: std::time::SystemTime::now(),
last_full_folder_scan: Mutex::new(None),
@@ -195,8 +195,12 @@ impl Context {
}
info!(self, "starting IO");
if let Err(err) = self.inner.scheduler.write().await.start(self.clone()).await {
error!(self, "Failed to start IO: {}", err)
let mut lock = self.inner.scheduler.write().await;
if lock.is_none() {
match Scheduler::start(self.clone()).await {
Err(err) => error!(self, "Failed to start IO: {}", err),
Ok(scheduler) => *lock = Some(scheduler),
}
}
}
@@ -209,8 +213,8 @@ impl Context {
// which will emit the below event(s)
info!(self, "stopping IO");
if let Err(err) = self.inner.stop_io().await {
warn!(self, "failed to stop IO: {}", err);
if let Some(scheduler) = self.inner.scheduler.write().await.take() {
scheduler.stop(self).await;
}
}
@@ -642,13 +646,6 @@ impl Context {
}
}
impl InnerContext {
async fn stop_io(&self) -> Result<()> {
self.scheduler.write().await.stop().await?;
Ok(())
}
}
impl Default for RunningState {
fn default() -> Self {
RunningState {

View File

@@ -445,8 +445,8 @@ async fn import_backup(
"Cannot import backups to accounts in use."
);
ensure!(
!context.scheduler.read().await.is_running(),
"cannot import backup, IO already running"
context.scheduler.read().await.is_none(),
"cannot import backup, IO is running"
);
let backup_file = File::open(backup_to_import).await?;
@@ -563,8 +563,8 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
.ok();
ensure!(
!context.scheduler.read().await.is_running(),
"cannot export backup, IO already running"
context.scheduler.read().await.is_none(),
"cannot export backup, IO is running"
);
info!(

View File

@@ -2,7 +2,7 @@ use anyhow::{bail, Context as _, Result};
use async_std::prelude::*;
use async_std::{
channel::{self, Receiver, Sender},
task,
future, task,
};
use crate::config::Config;
@@ -23,54 +23,62 @@ pub(crate) mod connectivity;
/// Job and connection scheduler.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Scheduler {
Stopped,
Running {
inbox: ImapConnectionState,
inbox_handle: Option<task::JoinHandle<()>>,
mvbox: ImapConnectionState,
mvbox_handle: Option<task::JoinHandle<()>>,
sentbox: ImapConnectionState,
sentbox_handle: Option<task::JoinHandle<()>>,
smtp: SmtpConnectionState,
smtp_handle: Option<task::JoinHandle<()>>,
ephemeral_handle: Option<task::JoinHandle<()>>,
ephemeral_interrupt_send: Sender<()>,
location_handle: Option<task::JoinHandle<()>>,
location_interrupt_send: Sender<()>,
},
pub(crate) struct Scheduler {
inbox: ImapConnectionState,
inbox_handle: task::JoinHandle<()>,
mvbox: ImapConnectionState,
mvbox_handle: Option<task::JoinHandle<()>>,
sentbox: ImapConnectionState,
sentbox_handle: Option<task::JoinHandle<()>>,
smtp: SmtpConnectionState,
smtp_handle: task::JoinHandle<()>,
ephemeral_handle: task::JoinHandle<()>,
ephemeral_interrupt_send: Sender<()>,
location_handle: task::JoinHandle<()>,
location_interrupt_send: Sender<()>,
}
impl Context {
/// Indicate that the network likely has come back.
pub async fn maybe_network(&self) {
let lock = self.scheduler.read().await;
lock.maybe_network().await;
if let Some(scheduler) = &*lock {
scheduler.maybe_network().await;
}
connectivity::idle_interrupted(lock).await;
}
/// Indicate that the network likely is lost.
pub async fn maybe_network_lost(&self) {
let lock = self.scheduler.read().await;
lock.maybe_network_lost().await;
if let Some(scheduler) = &*lock {
scheduler.maybe_network_lost().await;
}
connectivity::maybe_network_lost(self, lock).await;
}
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_inbox(info).await;
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_inbox(info).await;
}
}
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_smtp(info).await;
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_smtp(info).await;
}
}
pub(crate) async fn interrupt_ephemeral_task(&self) {
self.scheduler.read().await.interrupt_ephemeral_task().await;
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_ephemeral_task().await;
}
}
pub(crate) async fn interrupt_location(&self) {
self.scheduler.read().await.interrupt_location().await;
if let Some(scheduler) = &*self.scheduler.read().await {
scheduler.interrupt_location().await;
}
}
}
@@ -356,12 +364,8 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
}
impl Scheduler {
/// Start the scheduler, returns error if it is already running.
pub async fn start(&mut self, ctx: Context) -> Result<()> {
if self.is_running() {
bail!("scheduler is already started");
}
/// Start the scheduler.
pub async fn start(ctx: Context) -> Result<Self> {
let (mvbox, mvbox_handlers) = ImapConnectionState::new(&ctx).await?;
let (sentbox, sentbox_handlers) = ImapConnectionState::new(&ctx).await?;
let (smtp, smtp_handlers) = SmtpConnectionState::new();
@@ -378,9 +382,7 @@ impl Scheduler {
let inbox_handle = {
let ctx = ctx.clone();
Some(task::spawn(async move {
inbox_loop(ctx, inbox_start_send, inbox_handlers).await
}))
task::spawn(async move { inbox_loop(ctx, inbox_start_send, inbox_handlers).await })
};
if ctx.should_watch_mvbox().await? {
@@ -431,26 +433,24 @@ impl Scheduler {
let smtp_handle = {
let ctx = ctx.clone();
Some(task::spawn(async move {
smtp_loop(ctx, smtp_start_send, smtp_handlers).await
}))
task::spawn(async move { smtp_loop(ctx, smtp_start_send, smtp_handlers).await })
};
let ephemeral_handle = {
let ctx = ctx.clone();
Some(task::spawn(async move {
task::spawn(async move {
ephemeral::ephemeral_loop(&ctx, ephemeral_interrupt_recv).await;
}))
})
};
let location_handle = {
let ctx = ctx.clone();
Some(task::spawn(async move {
task::spawn(async move {
location::location_loop(&ctx, location_interrupt_recv).await;
}))
})
};
*self = Scheduler::Running {
let res = Self {
inbox,
mvbox,
sentbox,
@@ -477,14 +477,10 @@ impl Scheduler {
}
info!(ctx, "scheduler is running");
Ok(())
Ok(res)
}
async fn maybe_network(&self) {
if !self.is_running() {
return;
}
self.interrupt_inbox(InterruptInfo::new(true))
.join(self.interrupt_mvbox(InterruptInfo::new(true)))
.join(self.interrupt_sentbox(InterruptInfo::new(true)))
@@ -493,10 +489,6 @@ impl Scheduler {
}
async fn maybe_network_lost(&self) {
if !self.is_running() {
return;
}
self.interrupt_inbox(InterruptInfo::new(false))
.join(self.interrupt_mvbox(InterruptInfo::new(false)))
.join(self.interrupt_sentbox(InterruptInfo::new(false)))
@@ -505,109 +497,64 @@ impl Scheduler {
}
async fn interrupt_inbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref inbox, .. } = self {
inbox.interrupt(info).await;
}
self.inbox.interrupt(info).await;
}
async fn interrupt_mvbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref mvbox, .. } = self {
mvbox.interrupt(info).await;
}
self.mvbox.interrupt(info).await;
}
async fn interrupt_sentbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref sentbox, .. } = self {
sentbox.interrupt(info).await;
}
self.sentbox.interrupt(info).await;
}
async fn interrupt_smtp(&self, info: InterruptInfo) {
if let Scheduler::Running { ref smtp, .. } = self {
smtp.interrupt(info).await;
}
self.smtp.interrupt(info).await;
}
async fn interrupt_ephemeral_task(&self) {
if let Scheduler::Running {
ref ephemeral_interrupt_send,
..
} = self
{
ephemeral_interrupt_send.try_send(()).ok();
}
self.ephemeral_interrupt_send.try_send(()).ok();
}
async fn interrupt_location(&self) {
if let Scheduler::Running {
ref location_interrupt_send,
..
} = self
{
location_interrupt_send.try_send(()).ok();
}
self.location_interrupt_send.try_send(()).ok();
}
/// Halt the scheduler.
pub(crate) async fn stop(&mut self) -> Result<()> {
match self {
Scheduler::Stopped => {
bail!("scheduler is already stopped");
}
Scheduler::Running {
inbox,
inbox_handle,
mvbox,
mvbox_handle,
sentbox,
sentbox_handle,
smtp,
smtp_handle,
ephemeral_handle,
location_handle,
..
} => {
if inbox_handle.is_some() {
inbox.stop().await?;
}
if mvbox_handle.is_some() {
mvbox.stop().await?;
}
if sentbox_handle.is_some() {
sentbox.stop().await?;
}
if smtp_handle.is_some() {
smtp.stop().await?;
}
if let Some(handle) = inbox_handle.take() {
handle.await;
}
if let Some(handle) = mvbox_handle.take() {
handle.await;
}
if let Some(handle) = sentbox_handle.take() {
handle.await;
}
if let Some(handle) = smtp_handle.take() {
handle.await;
}
if let Some(handle) = ephemeral_handle.take() {
handle.cancel().await;
}
if let Some(handle) = location_handle.take() {
handle.cancel().await;
}
*self = Scheduler::Stopped;
Ok(())
}
///
/// It consumes the scheduler and never fails to stop it. In the worst case, long-running tasks
/// are forcefully terminated if they cannot shutdown within the timeout.
pub(crate) async fn stop(mut self, context: &Context) {
// Send stop signals to tasks so they can shutdown cleanly.
self.inbox.stop().await.ok_or_log(context);
if self.mvbox_handle.is_some() {
self.mvbox.stop().await.ok_or_log(context);
}
}
if self.sentbox_handle.is_some() {
self.sentbox.stop().await.ok_or_log(context);
}
self.smtp.stop().await.ok_or_log(context);
/// Check if the scheduler is running.
pub fn is_running(&self) -> bool {
matches!(self, Scheduler::Running { .. })
// Actually shutdown tasks.
let timeout_duration = std::time::Duration::from_secs(30);
future::timeout(timeout_duration, self.inbox_handle)
.await
.ok_or_log(context);
if let Some(mvbox_handle) = self.mvbox_handle.take() {
future::timeout(timeout_duration, mvbox_handle)
.await
.ok_or_log(context);
}
if let Some(sentbox_handle) = self.sentbox_handle.take() {
future::timeout(timeout_duration, sentbox_handle)
.await
.ok_or_log(context);
}
future::timeout(timeout_duration, self.smtp_handle)
.await
.ok_or_log(context);
self.ephemeral_handle.cancel().await;
self.location_handle.cancel().await;
}
}

View File

@@ -161,19 +161,19 @@ impl ConnectivityStore {
/// Set all folder states to InterruptingIdle in case they were `Connected` before.
/// Called during `dc_maybe_network()` to make sure that `dc_accounts_all_work_done()`
/// returns false immediately after `dc_maybe_network()`.
pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Scheduler>) {
pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Option<Scheduler>>) {
let [inbox, mvbox, sentbox] = match &*scheduler {
Scheduler::Running {
Some(Scheduler {
inbox,
mvbox,
sentbox,
..
} => [
}) => [
inbox.state.connectivity.clone(),
mvbox.state.connectivity.clone(),
sentbox.state.connectivity.clone(),
],
Scheduler::Stopped => return,
None => return,
};
drop(scheduler);
@@ -205,20 +205,20 @@ pub(crate) async fn idle_interrupted(scheduler: RwLockReadGuard<'_, Scheduler>)
/// after `maybe_network_lost()` was called.
pub(crate) async fn maybe_network_lost(
context: &Context,
scheduler: RwLockReadGuard<'_, Scheduler>,
scheduler: RwLockReadGuard<'_, Option<Scheduler>>,
) {
let stores = match &*scheduler {
Scheduler::Running {
Some(Scheduler {
inbox,
mvbox,
sentbox,
..
} => [
}) => [
inbox.state.connectivity.clone(),
mvbox.state.connectivity.clone(),
sentbox.state.connectivity.clone(),
],
Scheduler::Stopped => return,
None => return,
};
drop(scheduler);
@@ -265,16 +265,16 @@ impl Context {
pub async fn get_connectivity(&self) -> Connectivity {
let lock = self.scheduler.read().await;
let stores: Vec<_> = match &*lock {
Scheduler::Running {
Some(Scheduler {
inbox,
mvbox,
sentbox,
..
} => [&inbox.state, &mvbox.state, &sentbox.state]
}) => [&inbox.state, &mvbox.state, &sentbox.state]
.iter()
.map(|state| state.connectivity.clone())
.collect(),
Scheduler::Stopped => return Connectivity::NotConnected,
None => return Connectivity::NotConnected,
};
drop(lock);
@@ -353,13 +353,13 @@ impl Context {
let lock = self.scheduler.read().await;
let (folders_states, smtp) = match &*lock {
Scheduler::Running {
Some(Scheduler {
inbox,
mvbox,
sentbox,
smtp,
..
} => (
}) => (
[
(
Config::ConfiguredInboxFolder,
@@ -376,7 +376,7 @@ impl Context {
],
smtp.state.connectivity.clone(),
),
Scheduler::Stopped => {
None => {
return Err(anyhow!("Not started"));
}
};
@@ -552,17 +552,17 @@ impl Context {
pub async fn all_work_done(&self) -> bool {
let lock = self.scheduler.read().await;
let stores: Vec<_> = match &*lock {
Scheduler::Running {
Some(Scheduler {
inbox,
mvbox,
sentbox,
smtp,
..
} => [&inbox.state, &mvbox.state, &sentbox.state, &smtp.state]
}) => [&inbox.state, &mvbox.state, &sentbox.state, &smtp.state]
.iter()
.map(|state| state.connectivity.clone())
.collect(),
Scheduler::Stopped => return false,
None => return false,
};
drop(lock);