refactor: do not cancel the task returned from async_imap Handle.wait_with_timeout

This task is not guaranteed to be cancellation-safe
and provides a stop token for safe cancellation instead.
We should always cancel the task properly
and not by racing against another future.
Otherwise following call to Handle.done()
may work on IMAP session that is in the middle
of response, for example.
This commit is contained in:
link2xt
2025-02-08 23:34:38 +00:00
parent aaa02968d3
commit 81e9628ab7

View File

@@ -3,7 +3,6 @@ use std::time::Duration;
use anyhow::{Context as _, Result};
use async_channel::Receiver;
use async_imap::extensions::idle::IdleResponse;
use futures_lite::FutureExt;
use tokio::time::timeout;
use super::session::Session;
@@ -27,8 +26,6 @@ impl Session {
idle_interrupt_receiver: Receiver<()>,
folder: &str,
) -> Result<Self> {
use futures::future::FutureExt;
let create = true;
self.select_with_uidvalidity(context, folder, create)
.await?;
@@ -63,42 +60,46 @@ impl Session {
handle.as_mut().set_read_timeout(None);
let (idle_wait, interrupt) = handle.wait_with_timeout(IDLE_TIMEOUT);
enum Event {
IdleResponse(IdleResponse),
Interrupt,
}
info!(
context,
"IDLE entering wait-on-remote state in folder {folder:?}."
);
let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async {
idle_interrupt_receiver.recv().await.ok();
// cancel imap idle connection properly
drop(interrupt);
// Spawn a task to relay interrupts from `idle_interrupt_receiver`
// into interruptions of IDLE.
let interrupt_relay = {
let context = context.clone();
let folder = folder.to_string();
Ok(Event::Interrupt)
});
tokio::spawn(async move {
idle_interrupt_receiver.recv().await.ok();
match fut.await {
Ok(Event::IdleResponse(IdleResponse::NewData(x))) => {
info!(context, "{folder:?}: Received interrupt, stopping IDLE.");
// Drop `interrupt` in order to stop the IMAP IDLE.
drop(interrupt);
})
};
match idle_wait.await {
Ok(IdleResponse::NewData(x)) => {
info!(context, "{folder:?}: Idle has NewData {x:?}");
}
Ok(Event::IdleResponse(IdleResponse::Timeout)) => {
Ok(IdleResponse::Timeout) => {
info!(context, "{folder:?}: Idle-wait timeout or interruption.");
}
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
Ok(IdleResponse::ManualInterrupt) => {
info!(context, "{folder:?}: Idle wait was interrupted manually.");
}
Ok(Event::Interrupt) => {
info!(context, "{folder:?}: Idle wait was interrupted.");
}
Err(err) => {
warn!(context, "{folder:?}: Idle wait errored: {err:?}.");
}
}
// Abort the task, then await to ensure the future is dropped.
interrupt_relay.abort();
interrupt_relay.await.ok();
let mut session = tokio::time::timeout(Duration::from_secs(15), handle.done())
.await
.with_context(|| format!("{folder}: IMAP IDLE protocol timed out"))?