diff --git a/src/scheduler.rs b/src/scheduler.rs index 6f0ca6d14..bbc37cefb 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -10,6 +10,7 @@ use futures_lite::FutureExt; use rand::Rng; use tokio::sync::{oneshot, RwLock, RwLockWriteGuard}; use tokio::task; +use tokio_util::sync::CancellationToken; use self::connectivity::ConnectivityStore; use crate::config::{self, Config}; @@ -389,7 +390,7 @@ async fn inbox_loop( info!(ctx, "Starting inbox loop."); let ImapConnectionHandlers { mut connection, - stop_receiver, + stop_token, } = inbox_handlers; let ctx1 = ctx.clone(); @@ -428,8 +429,8 @@ async fn inbox_loop( } }; - stop_receiver - .recv() + stop_token + .cancelled() .map(|_| { info!(ctx, "Shutting down inbox loop."); }) @@ -703,7 +704,7 @@ async fn simple_imap_loop( info!(ctx, "Starting simple loop for {folder_meaning}."); let ImapConnectionHandlers { mut connection, - stop_receiver, + stop_token, } = inbox_handlers; let ctx1 = ctx.clone(); @@ -749,8 +750,8 @@ async fn simple_imap_loop( } }; - stop_receiver - .recv() + stop_token + .cancelled() .map(|_| { info!(ctx, "Shutting down IMAP loop for {folder_meaning}."); }) @@ -768,7 +769,7 @@ async fn smtp_loop( info!(ctx, "Starting SMTP loop."); let SmtpConnectionHandlers { mut connection, - stop_receiver, + stop_token, idle_interrupt_receiver, } = smtp_handlers; @@ -836,8 +837,8 @@ async fn smtp_loop( } }; - stop_receiver - .recv() + stop_token + .cancelled() .map(|_| { info!(ctx, "Shutting down SMTP loop."); }) @@ -982,9 +983,9 @@ impl Scheduler { pub(crate) async fn stop(self, context: &Context) { // Send stop signals to tasks so they can shutdown cleanly. for b in self.boxes() { - b.conn_state.stop().await.log_err(context).ok(); + b.conn_state.stop(); } - self.smtp.stop().await.log_err(context).ok(); + self.smtp.stop(); // Actually shutdown tasks. let timeout_duration = std::time::Duration::from_secs(30); @@ -1014,8 +1015,8 @@ impl Scheduler { /// Connection state logic shared between imap and smtp connections. #[derive(Debug)] struct ConnectionState { - /// Channel to interrupt the whole connection. - stop_sender: Sender<()>, + /// Cancellation token to interrupt the whole connection. + stop_token: CancellationToken, /// Channel to interrupt idle. idle_interrupt_sender: Sender<()>, /// Mutex to pass connectivity info between IMAP/SMTP threads and the API @@ -1024,13 +1025,9 @@ struct ConnectionState { impl ConnectionState { /// Shutdown this connection completely. - async fn stop(&self) -> Result<()> { + fn stop(&self) { // Trigger shutdown of the run loop. - self.stop_sender - .send(()) - .await - .context("failed to stop, missing receiver")?; - Ok(()) + self.stop_token.cancel(); } fn interrupt(&self) { @@ -1046,17 +1043,17 @@ pub(crate) struct SmtpConnectionState { impl SmtpConnectionState { fn new() -> (Self, SmtpConnectionHandlers) { - let (stop_sender, stop_receiver) = channel::bounded(1); + let stop_token = CancellationToken::new(); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let handlers = SmtpConnectionHandlers { connection: Smtp::new(), - stop_receiver, + stop_token: stop_token.clone(), idle_interrupt_receiver, }; let state = ConnectionState { - stop_sender, + stop_token, idle_interrupt_sender, connectivity: handlers.connection.connectivity.clone(), }; @@ -1072,15 +1069,14 @@ impl SmtpConnectionState { } /// Shutdown this connection completely. - async fn stop(&self) -> Result<()> { - self.state.stop().await?; - Ok(()) + fn stop(&self) { + self.state.stop(); } } struct SmtpConnectionHandlers { connection: Smtp, - stop_receiver: Receiver<()>, + stop_token: CancellationToken, idle_interrupt_receiver: Receiver<()>, } @@ -1092,16 +1088,16 @@ pub(crate) struct ImapConnectionState { impl ImapConnectionState { /// Construct a new connection. async fn new(context: &Context) -> Result<(Self, ImapConnectionHandlers)> { - let (stop_sender, stop_receiver) = channel::bounded(1); + let stop_token = CancellationToken::new(); let (idle_interrupt_sender, idle_interrupt_receiver) = channel::bounded(1); let handlers = ImapConnectionHandlers { connection: Imap::new_configured(context, idle_interrupt_receiver).await?, - stop_receiver, + stop_token: stop_token.clone(), }; let state = ConnectionState { - stop_sender, + stop_token, idle_interrupt_sender, connectivity: handlers.connection.connectivity.clone(), }; @@ -1117,14 +1113,13 @@ impl ImapConnectionState { } /// Shutdown this connection completely. - async fn stop(&self) -> Result<()> { - self.state.stop().await?; - Ok(()) + fn stop(&self) { + self.state.stop(); } } #[derive(Debug)] struct ImapConnectionHandlers { connection: Imap, - stop_receiver: Receiver<()>, + stop_token: CancellationToken, }