From 81e9628ab79941adaa12de1e30cefb8d8d69b3f9 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 8 Feb 2025 23:34:38 +0000 Subject: [PATCH] refactor: do not cancel the task returned from async_imap `Handle.wait_with_timeout` This task is not guaranteed to be cancellation-safe and provides a stop token for safe cancellation instead. We should always cancel the task properly and not by racing against another future. Otherwise following call to Handle.done() may work on IMAP session that is in the middle of response, for example. --- src/imap/idle.rs | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/imap/idle.rs b/src/imap/idle.rs index 346d85740..940990483 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -3,7 +3,6 @@ use std::time::Duration; use anyhow::{Context as _, Result}; use async_channel::Receiver; use async_imap::extensions::idle::IdleResponse; -use futures_lite::FutureExt; use tokio::time::timeout; use super::session::Session; @@ -27,8 +26,6 @@ impl Session { idle_interrupt_receiver: Receiver<()>, folder: &str, ) -> Result { - use futures::future::FutureExt; - let create = true; self.select_with_uidvalidity(context, folder, create) .await?; @@ -63,42 +60,46 @@ impl Session { handle.as_mut().set_read_timeout(None); let (idle_wait, interrupt) = handle.wait_with_timeout(IDLE_TIMEOUT); - enum Event { - IdleResponse(IdleResponse), - Interrupt, - } - info!( context, "IDLE entering wait-on-remote state in folder {folder:?}." ); - let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async { - idle_interrupt_receiver.recv().await.ok(); - // cancel imap idle connection properly - drop(interrupt); + // Spawn a task to relay interrupts from `idle_interrupt_receiver` + // into interruptions of IDLE. + let interrupt_relay = { + let context = context.clone(); + let folder = folder.to_string(); - Ok(Event::Interrupt) - }); + tokio::spawn(async move { + idle_interrupt_receiver.recv().await.ok(); - match fut.await { - Ok(Event::IdleResponse(IdleResponse::NewData(x))) => { + info!(context, "{folder:?}: Received interrupt, stopping IDLE."); + + // Drop `interrupt` in order to stop the IMAP IDLE. + drop(interrupt); + }) + }; + + match idle_wait.await { + Ok(IdleResponse::NewData(x)) => { info!(context, "{folder:?}: Idle has NewData {x:?}"); } - Ok(Event::IdleResponse(IdleResponse::Timeout)) => { + Ok(IdleResponse::Timeout) => { info!(context, "{folder:?}: Idle-wait timeout or interruption."); } - Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { + Ok(IdleResponse::ManualInterrupt) => { info!(context, "{folder:?}: Idle wait was interrupted manually."); } - Ok(Event::Interrupt) => { - info!(context, "{folder:?}: Idle wait was interrupted."); - } Err(err) => { warn!(context, "{folder:?}: Idle wait errored: {err:?}."); } } + // Abort the task, then await to ensure the future is dropped. + interrupt_relay.abort(); + interrupt_relay.await.ok(); + let mut session = tokio::time::timeout(Duration::from_secs(15), handle.done()) .await .with_context(|| format!("{folder}: IMAP IDLE protocol timed out"))?