refactor(imap): move resync request from Context to Imap

For multiple transports we will need to run
multiple IMAP clients in parallel.
UID validity change detected by one IMAP client
should not result in UID resync
for another IMAP client.
This commit is contained in:
link2xt
2025-11-05 22:15:22 +00:00
committed by l
parent 5f174ceaf2
commit 7fef812b1e
6 changed files with 21 additions and 27 deletions

View File

@@ -565,14 +565,6 @@ async fn configure(ctx: &Context, param: &EnteredLoginParam) -> Result<Option<&'
progress!(ctx, 910); progress!(ctx, 910);
if let Some(configured_addr) = ctx.get_config(Config::ConfiguredAddr).await? {
if configured_addr != param.addr {
// Switched account, all server UIDs we know are invalid
info!(ctx, "Scheduling resync because the address has changed.");
ctx.schedule_resync().await?;
}
}
let provider = configured_param.provider; let provider = configured_param.provider;
configured_param configured_param
.save_to_transports_table(ctx, param) .save_to_transports_table(ctx, param)

View File

@@ -4,7 +4,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ffi::OsString; use std::ffi::OsString;
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
use std::time::Duration; use std::time::Duration;
@@ -243,9 +243,6 @@ pub struct InnerContext {
/// Set to `None` if quota was never tried to load. /// Set to `None` if quota was never tried to load.
pub(crate) quota: RwLock<Option<QuotaInfo>>, pub(crate) quota: RwLock<Option<QuotaInfo>>,
/// IMAP UID resync request.
pub(crate) resync_request: AtomicBool,
/// Notify about new messages. /// Notify about new messages.
/// ///
/// This causes [`Context::wait_next_msgs`] to wake up. /// This causes [`Context::wait_next_msgs`] to wake up.
@@ -457,7 +454,6 @@ impl Context {
scheduler: SchedulerState::new(), scheduler: SchedulerState::new(),
ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow at least 1 message every 10 seconds + a burst of 6. ratelimit: RwLock::new(Ratelimit::new(Duration::new(60, 0), 6.0)), // Allow at least 1 message every 10 seconds + a burst of 6.
quota: RwLock::new(None), quota: RwLock::new(None),
resync_request: AtomicBool::new(false),
new_msgs_notify, new_msgs_notify,
server_id: RwLock::new(None), server_id: RwLock::new(None),
metadata: RwLock::new(None), metadata: RwLock::new(None),
@@ -616,12 +612,6 @@ impl Context {
Ok(()) Ok(())
} }
pub(crate) async fn schedule_resync(&self) -> Result<()> {
self.resync_request.store(true, Ordering::Relaxed);
self.scheduler.interrupt_inbox().await;
Ok(())
}
/// Returns a reference to the underlying SQL instance. /// Returns a reference to the underlying SQL instance.
/// ///
/// Warning: this is only here for testing, not part of the public API. /// Warning: this is only here for testing, not part of the public API.

View File

@@ -104,6 +104,12 @@ pub(crate) struct Imap {
/// immediately after logging in or returning an error in response to LOGIN command /// immediately after logging in or returning an error in response to LOGIN command
/// due to internal server error. /// due to internal server error.
ratelimit: Ratelimit, ratelimit: Ratelimit,
/// IMAP UID resync request sender.
pub(crate) resync_request_sender: async_channel::Sender<()>,
/// IMAP UID resync request receiver.
pub(crate) resync_request_receiver: async_channel::Receiver<()>,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -254,6 +260,7 @@ impl Imap {
oauth2: bool, oauth2: bool,
idle_interrupt_receiver: Receiver<()>, idle_interrupt_receiver: Receiver<()>,
) -> Self { ) -> Self {
let (resync_request_sender, resync_request_receiver) = async_channel::bounded(1);
Imap { Imap {
idle_interrupt_receiver, idle_interrupt_receiver,
addr: addr.to_string(), addr: addr.to_string(),
@@ -268,6 +275,8 @@ impl Imap {
conn_backoff_ms: 0, conn_backoff_ms: 0,
// 1 connection per minute + a burst of 2. // 1 connection per minute + a burst of 2.
ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0), ratelimit: Ratelimit::new(Duration::new(120, 0), 2.0),
resync_request_sender,
resync_request_receiver,
} }
} }
@@ -392,6 +401,7 @@ impl Imap {
match login_res { match login_res {
Ok(mut session) => { Ok(mut session) => {
let capabilities = determine_capabilities(&mut session).await?; let capabilities = determine_capabilities(&mut session).await?;
let resync_request_sender = self.resync_request_sender.clone();
let session = if capabilities.can_compress { let session = if capabilities.can_compress {
info!(context, "Enabling IMAP compression."); info!(context, "Enabling IMAP compression.");
@@ -402,9 +412,9 @@ impl Imap {
}) })
.await .await
.context("Failed to enable IMAP compression")?; .context("Failed to enable IMAP compression")?;
Session::new(compressed_session, capabilities) Session::new(compressed_session, capabilities, resync_request_sender)
} else { } else {
Session::new(session, capabilities) Session::new(session, capabilities, resync_request_sender)
}; };
// Store server ID in the context to display in account info. // Store server ID in the context to display in account info.

View File

@@ -206,7 +206,7 @@ impl ImapSession {
"The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {new_uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...", "The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {new_uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...",
); );
set_uid_next(context, folder, new_uid_next).await?; set_uid_next(context, folder, new_uid_next).await?;
context.schedule_resync().await?; self.resync_request_sender.try_send(()).ok();
} }
// If UIDNEXT changed, there are new emails. // If UIDNEXT changed, there are new emails.
@@ -243,7 +243,7 @@ impl ImapSession {
.await?; .await?;
if old_uid_validity != 0 || old_uid_next != 0 { if old_uid_validity != 0 || old_uid_next != 0 {
context.schedule_resync().await?; self.resync_request_sender.try_send(()).ok();
} }
info!( info!(
context, context,

View File

@@ -48,6 +48,8 @@ pub(crate) struct Session {
/// ///
/// Should be false if no folder is currently selected. /// Should be false if no folder is currently selected.
pub new_mail: bool, pub new_mail: bool,
pub resync_request_sender: async_channel::Sender<()>,
} }
impl Deref for Session { impl Deref for Session {
@@ -68,6 +70,7 @@ impl Session {
pub(crate) fn new( pub(crate) fn new(
inner: ImapSession<Box<dyn SessionStream>>, inner: ImapSession<Box<dyn SessionStream>>,
capabilities: Capabilities, capabilities: Capabilities,
resync_request_sender: async_channel::Sender<()>,
) -> Self { ) -> Self {
Self { Self {
inner, inner,
@@ -77,6 +80,7 @@ impl Session {
selected_folder_needs_expunge: false, selected_folder_needs_expunge: false,
last_full_folder_scan: Mutex::new(None), last_full_folder_scan: Mutex::new(None),
new_mail: false, new_mail: false,
resync_request_sender,
} }
} }

View File

@@ -1,7 +1,6 @@
use std::cmp; use std::cmp;
use std::iter::{self, once}; use std::iter::{self, once};
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::sync::atomic::Ordering;
use anyhow::{Context as _, Error, Result, bail}; use anyhow::{Context as _, Error, Result, bail};
use async_channel::{self as channel, Receiver, Sender}; use async_channel::{self as channel, Receiver, Sender};
@@ -481,11 +480,10 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
} }
} }
let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed); if let Ok(()) = imap.resync_request_receiver.try_recv() {
if resync_requested {
if let Err(err) = session.resync_folders(ctx).await { if let Err(err) = session.resync_folders(ctx).await {
warn!(ctx, "Failed to resync folders: {:#}.", err); warn!(ctx, "Failed to resync folders: {:#}.", err);
ctx.resync_request.store(true, Ordering::Relaxed); imap.resync_request_sender.try_send(()).ok();
} }
} }