diff --git a/CHANGELOG.md b/CHANGELOG.md index b363d8241..f4670a963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Fixes + +- simplify `dc_stop_io()` and remove potential panics and race conditions #3273 + ## 1.78.0 ### API-Changes diff --git a/src/context.rs b/src/context.rs index 93863e836..9286552be 100644 --- a/src/context.rs +++ b/src/context.rs @@ -189,22 +189,14 @@ impl Context { /// Starts the IO scheduler. pub async fn start_io(&self) { - info!(self, "starting IO"); - if self.inner.is_io_running().await { - info!(self, "IO is already running"); - return; - } - if let Ok(false) = self.is_configured().await { warn!(self, "can not start io on a context that is not configured"); return; } - { - let l = &mut *self.inner.scheduler.write().await; - if let Err(err) = l.start(self.clone()).await { - error!(self, "Failed to start IO: {}", err) - } + info!(self, "starting IO"); + if let Err(err) = self.inner.scheduler.write().await.start(self.clone()).await { + error!(self, "Failed to start IO: {}", err) } } @@ -212,7 +204,9 @@ impl Context { pub async fn stop_io(&self) { info!(self, "stopping IO"); - self.inner.stop_io().await; + if let Err(err) = self.inner.stop_io().await { + warn!(self, "failed to stop IO: {}", err); + } } /// Returns a reference to the underlying SQL instance. @@ -644,21 +638,9 @@ impl Context { } impl InnerContext { - async fn is_io_running(&self) -> bool { - self.scheduler.read().await.is_running() - } - - async fn stop_io(&self) { - if self.is_io_running().await { - let token = { - let lock = &*self.scheduler.read().await; - lock.pre_stop().await - }; - { - let lock = &mut *self.scheduler.write().await; - lock.stop(token).await; - } - } + async fn stop_io(&self) -> Result<()> { + self.scheduler.write().await.stop().await?; + Ok(()) } } diff --git a/src/scheduler.rs b/src/scheduler.rs index 4893bb613..5c2142263 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -20,8 +20,6 @@ use self::connectivity::ConnectivityStore; pub(crate) mod connectivity; -pub(crate) struct StopToken; - /// Job and connection scheduler. #[derive(Debug)] #[allow(clippy::large_enum_variant)] @@ -76,16 +74,15 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne let ImapConnectionHandlers { mut connection, stop_receiver, - shutdown_sender, } = inbox_handlers; let ctx1 = ctx.clone(); let fut = async move { - started - .send(()) - .await - .expect("inbox loop, missing started receiver"); let ctx = ctx1; + if let Err(err) = started.send(()).await { + warn!(ctx, "inbox loop, missing started receiver: {}", err); + return; + }; let mut info = InterruptInfo::default(); loop { @@ -131,10 +128,6 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne }) .race(fut) .await; - shutdown_sender - .send(()) - .await - .expect("inbox loop, missing shutdown receiver"); } async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo { @@ -259,17 +252,16 @@ async fn simple_imap_loop( let ImapConnectionHandlers { mut connection, stop_receiver, - shutdown_sender, } = inbox_handlers; let ctx1 = ctx.clone(); let fut = async move { - started - .send(()) - .await - .expect("simple imap loop, missing started receive"); let ctx = ctx1; + if let Err(err) = started.send(()).await { + warn!(&ctx, "simple imap loop, missing started receiver: {}", err); + return; + } loop { fetch_idle(&ctx, &mut connection, folder).await; @@ -283,10 +275,6 @@ async fn simple_imap_loop( }) .race(fut) .await; - shutdown_sender - .send(()) - .await - .expect("simple imap loop, missing shutdown receiver"); } async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnectionHandlers) { @@ -296,17 +284,16 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect let SmtpConnectionHandlers { mut connection, stop_receiver, - shutdown_sender, idle_interrupt_receiver, } = smtp_handlers; let ctx1 = ctx.clone(); let fut = async move { - started - .send(()) - .await - .expect("smtp loop, missing started receiver"); let ctx = ctx1; + if let Err(err) = started.send(()).await { + warn!(&ctx, "smtp loop, missing started receiver: {}", err); + return; + } let mut timeout = None; let mut interrupt_info = Default::default(); @@ -378,15 +365,15 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect }) .race(fut) .await; - shutdown_sender - .send(()) - .await - .expect("smtp loop, missing shutdown receiver"); } impl Scheduler { - /// Start the scheduler, panics if it is already running. + /// Start the scheduler, returns error if it is already running. pub async fn start(&mut self, ctx: Context) -> Result<()> { + if self.is_running() { + bail!("scheduler is already stopped"); + } + let (mvbox, mvbox_handlers) = ImapConnectionState::new(&ctx).await?; let (sentbox, sentbox_handlers) = ImapConnectionState::new(&ctx).await?; let (smtp, smtp_handlers) = SmtpConnectionState::new(); @@ -422,7 +409,7 @@ impl Scheduler { mvbox_start_send .send(()) .await - .expect("mvbox start send, missing receiver"); + .context("mvbox start send, missing receiver")?; mvbox_handlers .connection .connectivity @@ -445,7 +432,7 @@ impl Scheduler { sentbox_start_send .send(()) .await - .expect("sentbox start send, missing receiver"); + .context("sentbox start send, missing receiver")?; sentbox_handlers .connection .connectivity @@ -553,11 +540,11 @@ impl Scheduler { } } - /// Halts the scheduler, must be called first, and then `stop`. - pub(crate) async fn pre_stop(&self) -> StopToken { + /// Halt the scheduler. + pub(crate) async fn stop(&mut self) -> Result<()> { match self { Scheduler::Stopped => { - panic!("WARN: already stopped"); + bail!("scheduler is already stopped"); } Scheduler::Running { inbox, @@ -568,40 +555,22 @@ impl Scheduler { sentbox_handle, smtp, smtp_handle, - .. - } => { - if inbox_handle.is_some() { - inbox.stop().await; - } - if mvbox_handle.is_some() { - mvbox.stop().await; - } - if sentbox_handle.is_some() { - sentbox.stop().await; - } - if smtp_handle.is_some() { - smtp.stop().await; - } - - StopToken - } - } - } - - /// Halt the scheduler, must only be called after pre_stop. - pub(crate) async fn stop(&mut self, _t: StopToken) { - match self { - Scheduler::Stopped => { - panic!("WARN: already stopped"); - } - Scheduler::Running { - inbox_handle, - mvbox_handle, - sentbox_handle, - smtp_handle, ephemeral_handle, .. } => { + if inbox_handle.is_some() { + inbox.stop().await?; + } + if mvbox_handle.is_some() { + mvbox.stop().await?; + } + if sentbox_handle.is_some() { + sentbox.stop().await?; + } + if smtp_handle.is_some() { + smtp.stop().await?; + } + if let Some(handle) = inbox_handle.take() { handle.await; } @@ -619,6 +588,7 @@ impl Scheduler { } *self = Scheduler::Stopped; + Ok(()) } } } @@ -632,8 +602,6 @@ impl Scheduler { /// Connection state logic shared between imap and smtp connections. #[derive(Debug)] struct ConnectionState { - /// Channel to notify that shutdown has completed. - shutdown_receiver: Receiver<()>, /// Channel to interrupt the whole connection. stop_sender: Sender<()>, /// Channel to interrupt idle. @@ -644,14 +612,13 @@ struct ConnectionState { impl ConnectionState { /// Shutdown this connection completely. - async fn stop(&self) { + async fn stop(&self) -> Result<()> { // Trigger shutdown of the run loop. self.stop_sender .send(()) .await - .expect("stop, missing receiver"); - // Wait for a notification that the run loop has been shutdown. - self.shutdown_receiver.recv().await.ok(); + .context("failed to stop, missing receiver")?; + Ok(()) } async fn interrupt(&self, info: InterruptInfo) { @@ -668,18 +635,15 @@ pub(crate) struct SmtpConnectionState { impl SmtpConnectionState { fn new() -> (Self, SmtpConnectionHandlers) { let (stop_sender, stop_receiver) = channel::bounded(1); - let (shutdown_sender, shutdown_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let handlers = SmtpConnectionHandlers { connection: Smtp::new(), stop_receiver, - shutdown_sender, idle_interrupt_receiver, }; let state = ConnectionState { - shutdown_receiver, stop_sender, idle_interrupt_sender, connectivity: handlers.connection.connectivity.clone(), @@ -696,15 +660,15 @@ impl SmtpConnectionState { } /// Shutdown this connection completely. - async fn stop(&self) { - self.state.stop().await; + async fn stop(&self) -> Result<()> { + self.state.stop().await?; + Ok(()) } } struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, - shutdown_sender: Sender<()>, idle_interrupt_receiver: Receiver, } @@ -717,17 +681,14 @@ impl ImapConnectionState { /// Construct a new connection. async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> { let (stop_sender, stop_receiver) = channel::bounded(1); - let (shutdown_sender, shutdown_receiver) = channel::bounded(1); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let handlers = ImapConnectionHandlers { connection: Imap::new_configured(context, idle_interrupt_receiver).await?, stop_receiver, - shutdown_sender, }; let state = ConnectionState { - shutdown_receiver, stop_sender, idle_interrupt_sender, connectivity: handlers.connection.connectivity.clone(), @@ -744,8 +705,9 @@ impl ImapConnectionState { } /// Shutdown this connection completely. - async fn stop(&self) { - self.state.stop().await; + async fn stop(&self) -> Result<()> { + self.state.stop().await?; + Ok(()) } } @@ -753,7 +715,6 @@ impl ImapConnectionState { struct ImapConnectionHandlers { connection: Imap, stop_receiver: Receiver<()>, - shutdown_sender: Sender<()>, } #[derive(Default, Debug)]