Remove panics from the scheduler and simplify start/stop_io()

Hold scheduler lock during the whole procedure of scheduler starting
and stopping. This ensures that two processes can't get two read locks
in parallel and start loops or send the stop signal twice.

Also remove shutdown channels: it is enough to wait
for the loop handle without receiving a shutdown signal
from the end of the loop.
This commit is contained in:
link2xt
2022-04-30 16:53:17 +00:00
parent 4c9cc4f3d4
commit 3a10f0155f
3 changed files with 60 additions and 111 deletions

View File

@@ -1,5 +1,11 @@
# Changelog # Changelog
## Unreleased
### Fixes
- simplify `dc_stop_io()` and remove potential panics and race conditions #3273
## 1.78.0 ## 1.78.0
### API-Changes ### API-Changes

View File

@@ -189,22 +189,14 @@ impl Context {
/// Starts the IO scheduler. /// Starts the IO scheduler.
pub async fn start_io(&self) { pub async fn start_io(&self) {
info!(self, "starting IO");
if self.inner.is_io_running().await {
info!(self, "IO is already running");
return;
}
if let Ok(false) = self.is_configured().await { if let Ok(false) = self.is_configured().await {
warn!(self, "can not start io on a context that is not configured"); warn!(self, "can not start io on a context that is not configured");
return; return;
} }
{ info!(self, "starting IO");
let l = &mut *self.inner.scheduler.write().await; if let Err(err) = self.inner.scheduler.write().await.start(self.clone()).await {
if let Err(err) = l.start(self.clone()).await { error!(self, "Failed to start IO: {}", err)
error!(self, "Failed to start IO: {}", err)
}
} }
} }
@@ -212,7 +204,9 @@ impl Context {
pub async fn stop_io(&self) { pub async fn stop_io(&self) {
info!(self, "stopping IO"); info!(self, "stopping IO");
self.inner.stop_io().await; if let Err(err) = self.inner.stop_io().await {
warn!(self, "failed to stop IO: {}", err);
}
} }
/// Returns a reference to the underlying SQL instance. /// Returns a reference to the underlying SQL instance.
@@ -644,21 +638,9 @@ impl Context {
} }
impl InnerContext { impl InnerContext {
async fn is_io_running(&self) -> bool { async fn stop_io(&self) -> Result<()> {
self.scheduler.read().await.is_running() self.scheduler.write().await.stop().await?;
} Ok(())
async fn stop_io(&self) {
if self.is_io_running().await {
let token = {
let lock = &*self.scheduler.read().await;
lock.pre_stop().await
};
{
let lock = &mut *self.scheduler.write().await;
lock.stop(token).await;
}
}
} }
} }

View File

@@ -20,8 +20,6 @@ use self::connectivity::ConnectivityStore;
pub(crate) mod connectivity; pub(crate) mod connectivity;
pub(crate) struct StopToken;
/// Job and connection scheduler. /// Job and connection scheduler.
#[derive(Debug)] #[derive(Debug)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@@ -76,16 +74,15 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
let ImapConnectionHandlers { let ImapConnectionHandlers {
mut connection, mut connection,
stop_receiver, stop_receiver,
shutdown_sender,
} = inbox_handlers; } = inbox_handlers;
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started
.send(())
.await
.expect("inbox loop, missing started receiver");
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = started.send(()).await {
warn!(ctx, "inbox loop, missing started receiver: {}", err);
return;
};
let mut info = InterruptInfo::default(); let mut info = InterruptInfo::default();
loop { loop {
@@ -131,10 +128,6 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}) })
.race(fut) .race(fut)
.await; .await;
shutdown_sender
.send(())
.await
.expect("inbox loop, missing shutdown receiver");
} }
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo { async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo {
@@ -259,17 +252,16 @@ async fn simple_imap_loop(
let ImapConnectionHandlers { let ImapConnectionHandlers {
mut connection, mut connection,
stop_receiver, stop_receiver,
shutdown_sender,
} = inbox_handlers; } = inbox_handlers;
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started
.send(())
.await
.expect("simple imap loop, missing started receive");
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = started.send(()).await {
warn!(&ctx, "simple imap loop, missing started receiver: {}", err);
return;
}
loop { loop {
fetch_idle(&ctx, &mut connection, folder).await; fetch_idle(&ctx, &mut connection, folder).await;
@@ -283,10 +275,6 @@ async fn simple_imap_loop(
}) })
.race(fut) .race(fut)
.await; .await;
shutdown_sender
.send(())
.await
.expect("simple imap loop, missing shutdown receiver");
} }
async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) { async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) {
@@ -296,17 +284,16 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
let SmtpConnectionHandlers { let SmtpConnectionHandlers {
mut connection, mut connection,
stop_receiver, stop_receiver,
shutdown_sender,
idle_interrupt_receiver, idle_interrupt_receiver,
} = smtp_handlers; } = smtp_handlers;
let ctx1 = ctx.clone(); let ctx1 = ctx.clone();
let fut = async move { let fut = async move {
started
.send(())
.await
.expect("smtp loop, missing started receiver");
let ctx = ctx1; let ctx = ctx1;
if let Err(err) = started.send(()).await {
warn!(&ctx, "smtp loop, missing started receiver: {}", err);
return;
}
let mut timeout = None; let mut timeout = None;
let mut interrupt_info = Default::default(); let mut interrupt_info = Default::default();
@@ -378,15 +365,15 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
}) })
.race(fut) .race(fut)
.await; .await;
shutdown_sender
.send(())
.await
.expect("smtp loop, missing shutdown receiver");
} }
impl Scheduler { impl Scheduler {
/// Start the scheduler, panics if it is already running. /// Start the scheduler, returns error if it is already running.
pub async fn start(&mut self, ctx: Context) -> Result<()> { pub async fn start(&mut self, ctx: Context) -> Result<()> {
if self.is_running() {
bail!("scheduler is already stopped");
}
let (mvbox, mvbox_handlers) = ImapConnectionState::new(&ctx).await?; let (mvbox, mvbox_handlers) = ImapConnectionState::new(&ctx).await?;
let (sentbox, sentbox_handlers) = ImapConnectionState::new(&ctx).await?; let (sentbox, sentbox_handlers) = ImapConnectionState::new(&ctx).await?;
let (smtp, smtp_handlers) = SmtpConnectionState::new(); let (smtp, smtp_handlers) = SmtpConnectionState::new();
@@ -422,7 +409,7 @@ impl Scheduler {
mvbox_start_send mvbox_start_send
.send(()) .send(())
.await .await
.expect("mvbox start send, missing receiver"); .context("mvbox start send, missing receiver")?;
mvbox_handlers mvbox_handlers
.connection .connection
.connectivity .connectivity
@@ -445,7 +432,7 @@ impl Scheduler {
sentbox_start_send sentbox_start_send
.send(()) .send(())
.await .await
.expect("sentbox start send, missing receiver"); .context("sentbox start send, missing receiver")?;
sentbox_handlers sentbox_handlers
.connection .connection
.connectivity .connectivity
@@ -553,11 +540,11 @@ impl Scheduler {
} }
} }
/// Halts the scheduler, must be called first, and then `stop`. /// Halt the scheduler.
pub(crate) async fn pre_stop(&self) -> StopToken { pub(crate) async fn stop(&mut self) -> Result<()> {
match self { match self {
Scheduler::Stopped => { Scheduler::Stopped => {
panic!("WARN: already stopped"); bail!("scheduler is already stopped");
} }
Scheduler::Running { Scheduler::Running {
inbox, inbox,
@@ -568,40 +555,22 @@ impl Scheduler {
sentbox_handle, sentbox_handle,
smtp, smtp,
smtp_handle, smtp_handle,
..
} => {
if inbox_handle.is_some() {
inbox.stop().await;
}
if mvbox_handle.is_some() {
mvbox.stop().await;
}
if sentbox_handle.is_some() {
sentbox.stop().await;
}
if smtp_handle.is_some() {
smtp.stop().await;
}
StopToken
}
}
}
/// Halt the scheduler, must only be called after pre_stop.
pub(crate) async fn stop(&mut self, _t: StopToken) {
match self {
Scheduler::Stopped => {
panic!("WARN: already stopped");
}
Scheduler::Running {
inbox_handle,
mvbox_handle,
sentbox_handle,
smtp_handle,
ephemeral_handle, ephemeral_handle,
.. ..
} => { } => {
if inbox_handle.is_some() {
inbox.stop().await?;
}
if mvbox_handle.is_some() {
mvbox.stop().await?;
}
if sentbox_handle.is_some() {
sentbox.stop().await?;
}
if smtp_handle.is_some() {
smtp.stop().await?;
}
if let Some(handle) = inbox_handle.take() { if let Some(handle) = inbox_handle.take() {
handle.await; handle.await;
} }
@@ -619,6 +588,7 @@ impl Scheduler {
} }
*self = Scheduler::Stopped; *self = Scheduler::Stopped;
Ok(())
} }
} }
} }
@@ -632,8 +602,6 @@ impl Scheduler {
/// Connection state logic shared between imap and smtp connections. /// Connection state logic shared between imap and smtp connections.
#[derive(Debug)] #[derive(Debug)]
struct ConnectionState { struct ConnectionState {
/// Channel to notify that shutdown has completed.
shutdown_receiver: Receiver<()>,
/// Channel to interrupt the whole connection. /// Channel to interrupt the whole connection.
stop_sender: Sender<()>, stop_sender: Sender<()>,
/// Channel to interrupt idle. /// Channel to interrupt idle.
@@ -644,14 +612,13 @@ struct ConnectionState {
impl ConnectionState { impl ConnectionState {
/// Shutdown this connection completely. /// Shutdown this connection completely.
async fn stop(&self) { async fn stop(&self) -> Result<()> {
// Trigger shutdown of the run loop. // Trigger shutdown of the run loop.
self.stop_sender self.stop_sender
.send(()) .send(())
.await .await
.expect("stop, missing receiver"); .context("failed to stop, missing receiver")?;
// Wait for a notification that the run loop has been shutdown. Ok(())
self.shutdown_receiver.recv().await.ok();
} }
async fn interrupt(&self, info: InterruptInfo) { async fn interrupt(&self, info: InterruptInfo) {
@@ -668,18 +635,15 @@ pub(crate) struct SmtpConnectionState {
impl SmtpConnectionState { impl SmtpConnectionState {
fn new() -> (Self, SmtpConnectionHandlers) { fn new() -> (Self, SmtpConnectionHandlers) {
let (stop_sender, stop_receiver) = channel::bounded(1); let (stop_sender, stop_receiver) = channel::bounded(1);
let (shutdown_sender, shutdown_receiver) = channel::bounded(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
let handlers = SmtpConnectionHandlers { let handlers = SmtpConnectionHandlers {
connection: Smtp::new(), connection: Smtp::new(),
stop_receiver, stop_receiver,
shutdown_sender,
idle_interrupt_receiver, idle_interrupt_receiver,
}; };
let state = ConnectionState { let state = ConnectionState {
shutdown_receiver,
stop_sender, stop_sender,
idle_interrupt_sender, idle_interrupt_sender,
connectivity: handlers.connection.connectivity.clone(), connectivity: handlers.connection.connectivity.clone(),
@@ -696,15 +660,15 @@ impl SmtpConnectionState {
} }
/// Shutdown this connection completely. /// Shutdown this connection completely.
async fn stop(&self) { async fn stop(&self) -> Result<()> {
self.state.stop().await; self.state.stop().await?;
Ok(())
} }
} }
struct SmtpConnectionHandlers { struct SmtpConnectionHandlers {
connection: Smtp, connection: Smtp,
stop_receiver: Receiver<()>, stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
idle_interrupt_receiver: Receiver<InterruptInfo>, idle_interrupt_receiver: Receiver<InterruptInfo>,
} }
@@ -717,17 +681,14 @@ impl ImapConnectionState {
/// Construct a new connection. /// Construct a new connection.
async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> { async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> {
let (stop_sender, stop_receiver) = channel::bounded(1); let (stop_sender, stop_receiver) = channel::bounded(1);
let (shutdown_sender, shutdown_receiver) = channel::bounded(1);
let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1);
let handlers = ImapConnectionHandlers { let handlers = ImapConnectionHandlers {
connection: Imap::new_configured(context, idle_interrupt_receiver).await?, connection: Imap::new_configured(context, idle_interrupt_receiver).await?,
stop_receiver, stop_receiver,
shutdown_sender,
}; };
let state = ConnectionState { let state = ConnectionState {
shutdown_receiver,
stop_sender, stop_sender,
idle_interrupt_sender, idle_interrupt_sender,
connectivity: handlers.connection.connectivity.clone(), connectivity: handlers.connection.connectivity.clone(),
@@ -744,8 +705,9 @@ impl ImapConnectionState {
} }
/// Shutdown this connection completely. /// Shutdown this connection completely.
async fn stop(&self) { async fn stop(&self) -> Result<()> {
self.state.stop().await; self.state.stop().await?;
Ok(())
} }
} }
@@ -753,7 +715,6 @@ impl ImapConnectionState {
struct ImapConnectionHandlers { struct ImapConnectionHandlers {
connection: Imap, connection: Imap,
stop_receiver: Receiver<()>, stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
} }
#[derive(Default, Debug)] #[derive(Default, Debug)]