From 3a10f0155fc745d95f9853d52a1d709fc945ee95 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 30 Apr 2022 16:53:17 +0000 Subject: [PATCH] Remove panics from the scheduler and simplify start/stop_io() Hold scheduler lock during the whole procedure of scheduler starting and stopping. This ensures that two processes can't get two read locks in parallel and start loops or send the stop signal twice. Also remove shutdown channels: it is enough to wait for the loop handle without receiving a shutdown signal from the end of the loop. --- CHANGELOG.md | 6 +++ src/context.rs | 36 ++++--------- src/scheduler.rs | 129 +++++++++++++++++------------------------------ 3 files changed, 60 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b363d8241..f4670a963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Fixes + +- simplify `dc_stop_io()` and remove potential panics and race conditions #3273 + ## 1.78.0 ### API-Changes diff --git a/src/context.rs b/src/context.rs index 93863e836..9286552be 100644 --- a/src/context.rs +++ b/src/context.rs @@ -189,22 +189,14 @@ impl Context { /// Starts the IO scheduler. pub async fn start_io(&self) { - info!(self, "starting IO"); - if self.inner.is_io_running().await { - info!(self, "IO is already running"); - return; - } - if let Ok(false) = self.is_configured().await { warn!(self, "can not start io on a context that is not configured"); return; } - { - let l = &mut *self.inner.scheduler.write().await; - if let Err(err) = l.start(self.clone()).await { - error!(self, "Failed to start IO: {}", err) - } + info!(self, "starting IO"); + if let Err(err) = self.inner.scheduler.write().await.start(self.clone()).await { + error!(self, "Failed to start IO: {}", err) } } @@ -212,7 +204,9 @@ impl Context { pub async fn stop_io(&self) { info!(self, "stopping IO"); - self.inner.stop_io().await; + if let Err(err) = self.inner.stop_io().await { + warn!(self, "failed to stop IO: {}", err); + } } /// Returns a reference to the underlying SQL instance. @@ -644,21 +638,9 @@ impl Context { } impl InnerContext { - async fn is_io_running(&self) -> bool { - self.scheduler.read().await.is_running() - } - - async fn stop_io(&self) { - if self.is_io_running().await { - let token = { - let lock = &*self.scheduler.read().await; - lock.pre_stop().await - }; - { - let lock = &mut *self.scheduler.write().await; - lock.stop(token).await; - } - } + async fn stop_io(&self) -> Result<()> { + self.scheduler.write().await.stop().await?; + Ok(()) } } diff --git a/src/scheduler.rs b/src/scheduler.rs index 4893bb613..5c2142263 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -20,8 +20,6 @@ use self::connectivity::ConnectivityStore; pub(crate) mod connectivity; -pub(crate) struct StopToken; - /// Job and connection scheduler. #[derive(Debug)] #[allow(clippy::large_enum_variant)] @@ -76,16 +74,15 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne let ImapConnectionHandlers { mut connection, stop_receiver, - shutdown_sender, } = inbox_handlers; let ctx1 = ctx.clone(); let fut = async move { - started - .send(()) - .await - .expect("inbox loop, missing started receiver"); let ctx = ctx1; + if let Err(err) = started.send(()).await { + warn!(ctx, "inbox loop, missing started receiver: {}", err); + return; + }; let mut info = InterruptInfo::default(); loop { @@ -131,10 +128,6 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne }) .race(fut) .await; - shutdown_sender - .send(()) - .await - .expect("inbox loop, missing shutdown receiver"); } async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo { @@ -259,17 +252,16 @@ async fn simple_imap_loop( let ImapConnectionHandlers { mut connection, stop_receiver, - shutdown_sender, } = inbox_handlers; let ctx1 = ctx.clone(); let fut = async move { - started - .send(()) - .await - .expect("simple imap loop, missing started receive"); let ctx = ctx1; + if let Err(err) = started.send(()).await { + warn!(&ctx, "simple imap loop, missing started receiver: {}", err); + return; + } loop { fetch_idle(&ctx, &mut connection, folder).await; @@ -283,10 +275,6 @@ async fn simple_imap_loop( }) .race(fut) .await; - shutdown_sender - .send(()) - .await - .expect("simple imap loop, missing shutdown receiver"); } async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) { @@ -296,17 +284,16 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect let SmtpConnectionHandlers { mut connection, stop_receiver, - shutdown_sender, idle_interrupt_receiver, } = smtp_handlers; let ctx1 = ctx.clone(); let fut = async move { - started - .send(()) - .await - .expect("smtp loop, missing started receiver"); let ctx = ctx1; + if let Err(err) = started.send(()).await { + warn!(&ctx, "smtp loop, missing started receiver: {}", err); + return; + } let mut timeout = None; let mut interrupt_info = Default::default(); @@ -378,15 +365,15 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect }) .race(fut) .await; - shutdown_sender - .send(()) - .await - .expect("smtp loop, missing shutdown receiver"); } impl Scheduler { - /// Start the scheduler, panics if it is already running. + /// Start the scheduler, returns error if it is already running. pub async fn start(&mut self, ctx: Context) -> Result<()> { + if self.is_running() { + bail!("scheduler is already stopped"); + } + let (mvbox, mvbox_handlers) = ImapConnectionState::new(&ctx).await?; let (sentbox, sentbox_handlers) = ImapConnectionState::new(&ctx).await?; let (smtp, smtp_handlers) = SmtpConnectionState::new(); @@ -422,7 +409,7 @@ impl Scheduler { mvbox_start_send .send(()) .await - .expect("mvbox start send, missing receiver"); + .context("mvbox start send, missing receiver")?; mvbox_handlers .connection .connectivity @@ -445,7 +432,7 @@ impl Scheduler { sentbox_start_send .send(()) .await - .expect("sentbox start send, missing receiver"); + .context("sentbox start send, missing receiver")?; sentbox_handlers .connection .connectivity @@ -553,11 +540,11 @@ impl Scheduler { } } - /// Halts the scheduler, must be called first, and then `stop`. - pub(crate) async fn pre_stop(&self) -> StopToken { + /// Halt the scheduler. + pub(crate) async fn stop(&mut self) -> Result<()> { match self { Scheduler::Stopped => { - panic!("WARN: already stopped"); + bail!("scheduler is already stopped"); } Scheduler::Running { inbox, @@ -568,40 +555,22 @@ impl Scheduler { sentbox_handle, smtp, smtp_handle, - .. - } => { - if inbox_handle.is_some() { - inbox.stop().await; - } - if mvbox_handle.is_some() { - mvbox.stop().await; - } - if sentbox_handle.is_some() { - sentbox.stop().await; - } - if smtp_handle.is_some() { - smtp.stop().await; - } - - StopToken - } - } - } - - /// Halt the scheduler, must only be called after pre_stop. - pub(crate) async fn stop(&mut self, _t: StopToken) { - match self { - Scheduler::Stopped => { - panic!("WARN: already stopped"); - } - Scheduler::Running { - inbox_handle, - mvbox_handle, - sentbox_handle, - smtp_handle, ephemeral_handle, .. } => { + if inbox_handle.is_some() { + inbox.stop().await?; + } + if mvbox_handle.is_some() { + mvbox.stop().await?; + } + if sentbox_handle.is_some() { + sentbox.stop().await?; + } + if smtp_handle.is_some() { + smtp.stop().await?; + } + if let Some(handle) = inbox_handle.take() { handle.await; } @@ -619,6 +588,7 @@ impl Scheduler { } *self = Scheduler::Stopped; + Ok(()) } } } @@ -632,8 +602,6 @@ impl Scheduler { /// Connection state logic shared between imap and smtp connections. #[derive(Debug)] struct ConnectionState { - /// Channel to notify that shutdown has completed. - shutdown_receiver: Receiver<()>, /// Channel to interrupt the whole connection. stop_sender: Sender<()>, /// Channel to interrupt idle. @@ -644,14 +612,13 @@ struct ConnectionState { impl ConnectionState { /// Shutdown this connection completely. - async fn stop(&self) { + async fn stop(&self) -> Result<()> { // Trigger shutdown of the run loop. self.stop_sender .send(()) .await - .expect("stop, missing receiver"); - // Wait for a notification that the run loop has been shutdown. - self.shutdown_receiver.recv().await.ok(); + .context("failed to stop, missing receiver")?; + Ok(()) } async fn interrupt(&self, info: InterruptInfo) { @@ -668,18 +635,15 @@ pub(crate) struct SmtpConnectionState { impl SmtpConnectionState { fn new() -> (Self, SmtpConnectionHandlers) { let (stop_sender, stop_receiver) = channel::bounded(1); - let (shutdown_sender, shutdown_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let handlers = SmtpConnectionHandlers { connection: Smtp::new(), stop_receiver, - shutdown_sender, idle_interrupt_receiver, }; let state = ConnectionState { - shutdown_receiver, stop_sender, idle_interrupt_sender, connectivity: handlers.connection.connectivity.clone(), @@ -696,15 +660,15 @@ impl SmtpConnectionState { } /// Shutdown this connection completely. - async fn stop(&self) { - self.state.stop().await; + async fn stop(&self) -> Result<()> { + self.state.stop().await?; + Ok(()) } } struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, - shutdown_sender: Sender<()>, idle_interrupt_receiver: Receiver, } @@ -717,17 +681,14 @@ impl ImapConnectionState { /// Construct a new connection. async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> { let (stop_sender, stop_receiver) = channel::bounded(1); - let (shutdown_sender, shutdown_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let handlers = ImapConnectionHandlers { connection: Imap::new_configured(context, idle_interrupt_receiver).await?, stop_receiver, - shutdown_sender, }; let state = ConnectionState { - shutdown_receiver, stop_sender, idle_interrupt_sender, connectivity: handlers.connection.connectivity.clone(), @@ -744,8 +705,9 @@ impl ImapConnectionState { } /// Shutdown this connection completely. - async fn stop(&self) { - self.state.stop().await; + async fn stop(&self) -> Result<()> { + self.state.stop().await?; + Ok(()) } } @@ -753,7 +715,6 @@ impl ImapConnectionState { struct ImapConnectionHandlers { connection: Imap, stop_receiver: Receiver<()>, - shutdown_sender: Sender<()>, } #[derive(Default, Debug)]