From 07870a6d69365535366470ed6fb9e49253d2097a Mon Sep 17 00:00:00 2001 From: link2xt Date: Thu, 29 Feb 2024 02:43:48 +0000 Subject: [PATCH] refactor(imap): remove Session from Imap structure Connection establishment now happens only in one place in each IMAP loop. Now all connection establishment happens in one place and is limited by the ratelimit. Backoff was removed from fake_idle as it does not establish connections anymore. If connection fails, fake_idle will return an error. We then drop the connection and get back to the beginning of IMAP loop. Backoff may be still nice to have to delay retries in case of constant connection failures so we don't immediately hit ratelimit if the network is unusable and returns immediate error on each connection attempt (e.g. ICMP network unreachable error), but adding backoff for connection failures is out of scope for this change. --- src/configure.rs | 19 +-- src/context.rs | 10 +- src/download.rs | 19 +-- src/imap.rs | 145 ++++++---------- src/imap/idle.rs | 90 +++------- src/imap/scan_folders.rs | 14 +- src/scheduler.rs | 346 ++++++++++++++++++--------------------- 7 files changed, 254 insertions(+), 389 deletions(-) diff --git a/src/configure.rs b/src/configure.rs index c0b953399..60a44b299 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -25,7 +25,7 @@ use tokio::task; use crate::config::{self, Config}; use crate::contact::addr_cmp; use crate::context::Context; -use crate::imap::Imap; +use crate::imap::{session::Session as ImapSession, Imap}; use crate::log::LogExt; use crate::login_param::{CertificateChecks, LoginParam, ServerLoginParam}; use crate::message::{Message, Viewtype}; @@ -395,7 +395,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { // Configure IMAP - let mut imap: Option = None; + let mut imap: Option<(Imap, ImapSession)> = None; let imap_servers: Vec<&ServerParams> = servers .iter() .filter(|params| params.protocol == Protocol::Imap) @@ -433,7 +433,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { 600 + (800 - 600) * (1 + imap_server_index) / imap_servers_count ); } - let mut imap = match imap { + let (mut imap, mut imap_session) = match imap { Some(imap) => imap, None => bail!(nicer_configuration_error(ctx, errors).await), }; @@ -454,11 +454,10 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { let create_mvbox = ctx.should_watch_mvbox().await?; - imap.configure_folders(ctx, create_mvbox).await?; + imap.configure_folders(ctx, &mut imap_session, create_mvbox) + .await?; - imap.session - .as_mut() - .context("no IMAP connection established")? + imap_session .select_with_uidvalidity(ctx, "INBOX") .await .context("could not read INBOX status")?; @@ -579,7 +578,7 @@ async fn try_imap_one_param( socks5_config: &Option, addr: &str, provider_strict_tls: bool, -) -> Result { +) -> Result<(Imap, ImapSession), ConfigurationError> { let inf = format!( "imap: {}@{}:{} security={} certificate_checks={} oauth2={} socks5_config={}", param.user, @@ -617,9 +616,9 @@ async fn try_imap_one_param( msg: format!("{err:#}"), }) } - Ok(()) => { + Ok(session) => { info!(context, "success: {}", inf); - Ok(imap) + Ok((imap, session)) } } } diff --git a/src/context.rs b/src/context.rs index c9c90b2cf..d903430d6 100644 --- a/src/context.rs +++ b/src/context.rs @@ -461,13 +461,13 @@ impl Context { // connection let mut connection = Imap::new_configured(self, channel::bounded(1).1).await?; - connection.prepare(self).await?; + let mut session = connection.prepare(self).await?; // fetch imap folders for folder_meaning in [FolderMeaning::Inbox, FolderMeaning::Mvbox] { let (_, watch_folder) = convert_folder_meaning(self, folder_meaning).await?; connection - .fetch_move_delete(self, &watch_folder, folder_meaning) + .fetch_move_delete(self, &mut session, &watch_folder, folder_meaning) .await?; } @@ -484,10 +484,8 @@ impl Context { }; if quota_needs_update { - if let Some(session) = connection.session.as_mut() { - if let Err(err) = self.update_recent_quota(session).await { - warn!(self, "Failed to update quota: {err:#}."); - } + if let Err(err) = self.update_recent_quota(&mut session).await { + warn!(self, "Failed to update quota: {err:#}."); } } diff --git a/src/download.rs b/src/download.rs index c8bc01839..e3770477d 100644 --- a/src/download.rs +++ b/src/download.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::config::Config; use crate::context::Context; -use crate::imap::{Imap, ImapActionResult}; +use crate::imap::{session::Session, ImapActionResult}; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::{MimeMessage, Part}; use crate::tools::time; @@ -129,9 +129,11 @@ impl Message { /// Actually download a message partially downloaded before. /// /// Most messages are downloaded automatically on fetch instead. -pub(crate) async fn download_msg(context: &Context, msg_id: MsgId, imap: &mut Imap) -> Result<()> { - imap.prepare(context).await?; - +pub(crate) async fn download_msg( + context: &Context, + msg_id: MsgId, + session: &mut Session, +) -> Result<()> { let msg = Message::load_from_db(context, msg_id).await?; let row = context .sql @@ -152,7 +154,7 @@ pub(crate) async fn download_msg(context: &Context, msg_id: MsgId, imap: &mut Im return Err(anyhow!("Call download_full() again to try over.")); }; - match imap + match session .fetch_single_msg( context, &server_folder, @@ -169,7 +171,7 @@ pub(crate) async fn download_msg(context: &Context, msg_id: MsgId, imap: &mut Im } } -impl Imap { +impl Session { /// Download a single message and pipe it to receive_imf(). /// /// receive_imf() is not directly aware that this is a result of a call to download_msg(), @@ -194,10 +196,7 @@ impl Imap { let mut uid_message_ids: BTreeMap = BTreeMap::new(); uid_message_ids.insert(uid, rfc724_mid); - let Some(session) = self.session.as_mut() else { - return ImapActionResult::Failed; - }; - let (last_uid, _received) = match session + let (last_uid, _received) = match self .fetch_many_msgs( context, folder, diff --git a/src/imap.rs b/src/imap.rs index 19bce5748..21a80c0f1 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -69,10 +69,9 @@ const BODY_FULL: &str = "(FLAGS BODY.PEEK[])"; const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])"; #[derive(Debug)] -pub struct Imap { +pub(crate) struct Imap { pub(crate) idle_interrupt_receiver: Receiver<()>, config: ImapConfig, - pub(crate) session: Option, login_failed_once: bool, pub(crate) connectivity: ConnectivityStore, @@ -255,7 +254,6 @@ impl Imap { let imap = Imap { idle_interrupt_receiver, config, - session: None, login_failed_once: false, connectivity: Default::default(), // 1 connection per minute + a burst of 2. @@ -298,15 +296,11 @@ impl Imap { /// Calling this function is not enough to perform IMAP operations. Use [`Imap::prepare`] /// instead if you are going to actually use connection rather than trying connection /// parameters. - pub async fn connect(&mut self, context: &Context) -> Result<()> { + pub(crate) async fn connect(&mut self, context: &Context) -> Result { if self.config.lp.server.is_empty() { bail!("IMAP operation attempted while it is torn down"); } - if self.session.is_some() { - return Ok(()); - } - let ratelimit_duration = self.ratelimit.read().await.until_can_send(); if !ratelimit_duration.is_zero() { warn!( @@ -410,7 +404,6 @@ impl Imap { let mut lock = context.server_id.write().await; *lock = session.capabilities.server_id.clone(); - self.session = Some(session); self.login_failed_once = false; context.emit_event(EventType::ImapConnected(format!( "IMAP-LOGIN as {}", @@ -418,7 +411,7 @@ impl Imap { ))); self.connectivity.set_connected(context).await; info!(context, "Successfully logged into IMAP server"); - Ok(()) + Ok(session) } Err(err) => { @@ -461,22 +454,26 @@ impl Imap { /// /// Ensure that IMAP client is connected, folders are created and IMAP capabilities are /// determined. - pub async fn prepare(&mut self, context: &Context) -> Result<()> { - if let Err(err) = self.connect(context).await { - self.connectivity.set_err(context, &err).await; - return Err(err); + pub(crate) async fn prepare(&mut self, context: &Context) -> Result { + let mut session = match self.connect(context).await { + Ok(session) => session, + Err(err) => { + self.connectivity.set_err(context, &err).await; + return Err(err); + } + }; + + let folders_configured = context + .sql + .get_raw_config_int(constants::DC_FOLDERS_CONFIGURED_KEY) + .await?; + if folders_configured.unwrap_or_default() < constants::DC_FOLDERS_CONFIGURED_VERSION { + let create_mvbox = true; + self.configure_folders(context, &mut session, create_mvbox) + .await?; } - self.ensure_configured_folders(context, true).await?; - Ok(()) - } - - /// Drops the session without disconnecting properly. - /// Useful in case of an IMAP error, when it's unclear if it's in a correct state and it's - /// easier to setup a new connection. - pub fn trigger_reconnect(&mut self, context: &Context) { - info!(context, "Dropping an IMAP connection."); - self.session = None; + Ok(session) } /// FETCH-MOVE-DELETE iteration. @@ -486,6 +483,7 @@ impl Imap { pub async fn fetch_move_delete( &mut self, context: &Context, + session: &mut Session, watch_folder: &str, folder_meaning: FolderMeaning, ) -> Result<()> { @@ -493,10 +491,9 @@ impl Imap { // probably shutdown bail!("IMAP operation attempted while it is torn down"); } - self.prepare(context).await?; let msgs_fetched = self - .fetch_new_messages(context, watch_folder, folder_meaning, false) + .fetch_new_messages(context, session, watch_folder, folder_meaning, false) .await .context("fetch_new_messages")?; if msgs_fetched && context.get_config_delete_device_after().await?.is_some() { @@ -507,10 +504,6 @@ impl Imap { context.scheduler.interrupt_ephemeral_task().await; } - let session = self - .session - .as_mut() - .context("no IMAP connection established")?; session .move_delete_messages(context, watch_folder) .await @@ -525,6 +518,7 @@ impl Imap { pub(crate) async fn fetch_new_messages( &mut self, context: &Context, + session: &mut Session, folder: &str, folder_meaning: FolderMeaning, fetch_existing_msgs: bool, @@ -534,7 +528,6 @@ impl Imap { return Ok(false); } - let session = self.session.as_mut().context("No IMAP session")?; let new_emails = session .select_with_uidvalidity(context, folder) .await @@ -694,10 +687,7 @@ impl Imap { uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1)); for (uid, fp) in uids_fetch { if fp != fetch_partially { - let (largest_uid_fetched_in_batch, received_msgs_in_batch) = self - .session - .as_mut() - .context("No IMAP session")? + let (largest_uid_fetched_in_batch, received_msgs_in_batch) = session .fetch_many_msgs( context, folder, @@ -724,10 +714,7 @@ impl Imap { // Largest known UID is normally less than UIDNEXT, // but a message may have arrived between determining UIDNEXT // and executing the FETCH command. - let mailbox_uid_next = self - .session - .as_ref() - .context("No IMAP session")? + let mailbox_uid_next = session .selected_mailbox .as_ref() .with_context(|| format!("Expected {folder:?} to be selected"))? @@ -762,13 +749,15 @@ impl Imap { /// /// Then, Fetch the last messages DC_FETCH_EXISTING_MSGS_COUNT emails from the server /// and show them in the chat list. - pub(crate) async fn fetch_existing_msgs(&mut self, context: &Context) -> Result<()> { + pub(crate) async fn fetch_existing_msgs( + &mut self, + context: &Context, + session: &mut Session, + ) -> Result<()> { if context.get_config_bool(Config::Bot).await? { return Ok(()); // Bots don't want those messages } - self.prepare(context).await.context("could not connect")?; - let session = self.session.as_mut().context("No IMAP session")?; add_all_recipients_as_contacts(context, session, Config::ConfiguredSentboxFolder) .await .context("failed to get recipients from the sentbox")?; @@ -794,7 +783,7 @@ impl Imap { context, "Fetching existing messages from folder {folder:?}." ); - self.fetch_new_messages(context, &folder, meaning, true) + self.fetch_new_messages(context, session, &folder, meaning, true) .await .context("could not fetch existing messages")?; } @@ -1493,9 +1482,7 @@ impl Session { } Ok(()) } -} -impl Imap { pub(crate) async fn prepare_imap_operation_on_msg( &mut self, context: &Context, @@ -1505,24 +1492,8 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::RetryLater); } - if let Err(err) = self.prepare(context).await { - warn!(context, "prepare_imap_op failed: {}", err); - return Some(ImapActionResult::RetryLater); - } - let session = match self - .session - .as_mut() - .context("no IMAP connection established") - { - Err(err) => { - error!(context, "Failed to prepare IMAP operation: {:#}", err); - return Some(ImapActionResult::Failed); - } - Ok(session) => session, - }; - - match session.select_folder(context, Some(folder)).await { + match self.select_folder(context, Some(folder)).await { Ok(_) => None, Err(select_folder::Error::ConnectionLost) => { warn!(context, "Lost imap connection"); @@ -1539,25 +1510,6 @@ impl Imap { } } - pub async fn ensure_configured_folders( - &mut self, - context: &Context, - create_mvbox: bool, - ) -> Result<()> { - let folders_configured = context - .sql - .get_raw_config_int(constants::DC_FOLDERS_CONFIGURED_KEY) - .await?; - if folders_configured.unwrap_or_default() >= constants::DC_FOLDERS_CONFIGURED_VERSION { - return Ok(()); - } - if let Err(err) = self.connect(context).await { - self.connectivity.set_err(context, &err).await; - return Err(err); - } - self.configure_folders(context, create_mvbox).await - } - /// Attempts to configure mvbox. /// /// Tries to find any folder in the given list of `folders`. If none is found, tries to create @@ -1572,27 +1524,22 @@ impl Imap { folders: &[&'a str], create_mvbox: bool, ) -> Result> { - let session = self - .session - .as_mut() - .context("no IMAP connection established")?; - // Close currently selected folder if needed. // We are going to select folders using low-level EXAMINE operations below. - session.select_folder(context, None).await?; + self.select_folder(context, None).await?; for folder in folders { info!(context, "Looking for MVBOX-folder \"{}\"...", &folder); - let res = session.examine(&folder).await; + let res = self.examine(&folder).await; if res.is_ok() { info!( context, "MVBOX-folder {:?} successfully selected, using it.", &folder ); - session.close().await?; + self.close().await?; // Before moving emails to the mvbox we need to remember its UIDVALIDITY, otherwise // emails moved before that wouldn't be fetched but considered "old" instead. - session.select_with_uidvalidity(context, folder).await?; + self.select_with_uidvalidity(context, folder).await?; return Ok(Some(folder)); } } @@ -1603,7 +1550,7 @@ impl Imap { let Some(folder) = folders.first() else { return Ok(None); }; - match session.select_with_uidvalidity(context, folder).await { + match self.select_with_uidvalidity(context, folder).await { Ok(_) => { info!(context, "MVBOX-folder {} created.", folder); return Ok(Some(folder)); @@ -1614,13 +1561,15 @@ impl Imap { } Ok(None) } +} - pub async fn configure_folders(&mut self, context: &Context, create_mvbox: bool) -> Result<()> { - let session = self - .session - .as_mut() - .context("no IMAP connection established")?; - +impl Imap { + pub(crate) async fn configure_folders( + &mut self, + context: &Context, + session: &mut Session, + create_mvbox: bool, + ) -> Result<()> { let mut folders = session .list(Some(""), Some("*")) .await @@ -1657,7 +1606,7 @@ impl Imap { info!(context, "Using \"{}\" as folder-delimiter.", delimiter); let fallback_folder = format!("INBOX{delimiter}DeltaChat"); - let mvbox_folder = self + let mvbox_folder = session .configure_mvbox(context, &["DeltaChat", &fallback_folder], create_mvbox) .await .context("failed to configure mvbox")?; diff --git a/src/imap/idle.rs b/src/imap/idle.rs index f6b1052da..0f78faab7 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -4,13 +4,12 @@ use anyhow::{bail, Context as _, Result}; use async_channel::Receiver; use async_imap::extensions::idle::IdleResponse; use futures_lite::FutureExt; +use tokio::time::timeout; use super::session::Session; use super::Imap; -use crate::config::Config; use crate::context::Context; use crate::imap::{client::IMAP_TIMEOUT, FolderMeaning}; -use crate::log::LogExt; use crate::tools::{self, time_elapsed}; /// Timeout after which IDLE is finished @@ -98,90 +97,38 @@ impl Session { } impl Imap { + /// Idle using polling. pub(crate) async fn fake_idle( &mut self, context: &Context, + session: &mut Session, watch_folder: String, folder_meaning: FolderMeaning, - ) { - // Idle using polling. This is also needed if we're not yet configured - - // in this case, we're waiting for a configure job (and an interrupt). - + ) -> Result<()> { let fake_idle_start_time = tools::Time::now(); - // Do not poll, just wait for an interrupt when no folder is passed in. info!(context, "IMAP-fake-IDLEing folder={:?}", watch_folder); - const TIMEOUT_INIT_MS: u64 = 60_000; - let mut timeout_ms: u64 = TIMEOUT_INIT_MS; - enum Event { - Tick, - Interrupt, - } - // loop until we are interrupted or if we fetched something + // Loop until we are interrupted or until we fetch something. loop { - use futures::future::FutureExt; - use rand::Rng; - - let mut interval = tokio::time::interval(Duration::from_millis(timeout_ms)); - timeout_ms = timeout_ms - .saturating_add(rand::thread_rng().gen_range((timeout_ms / 2)..=timeout_ms)); - interval.tick().await; // The first tick completes immediately. - match interval - .tick() - .map(|_| Event::Tick) - .race( - self.idle_interrupt_receiver - .recv() - .map(|_| Event::Interrupt), - ) - .await - { - Event::Tick => { - // try to connect with proper login params - // (setup_handle_if_needed might not know about them if we - // never successfully connected) - if let Err(err) = self.prepare(context).await { - warn!(context, "fake_idle: could not connect: {}", err); - continue; - } - if let Some(session) = &self.session { - if session.can_idle() - && !context - .get_config_bool(Config::DisableIdle) - .await - .context("Failed to get disable_idle config") - .log_err(context) - .unwrap_or_default() - { - // we only fake-idled because network was gone during IDLE, probably - break; - } - } - info!(context, "fake_idle is connected"); - // we are connected, let's see if fetching messages results + match timeout(Duration::from_secs(60), self.idle_interrupt_receiver.recv()).await { + Err(_) => { + // Let's see if fetching messages results // in anything. If so, we behave as if IDLE had data but // will have already fetched the messages so perform_*_fetch // will not find any new. - match self - .fetch_new_messages(context, &watch_folder, folder_meaning, false) - .await - { - Ok(res) => { - info!(context, "fetch_new_messages returned {:?}", res); - timeout_ms = TIMEOUT_INIT_MS; - if res { - break; - } - } - Err(err) => { - error!(context, "could not fetch from folder: {:#}", err); - self.trigger_reconnect(context); - } + let res = self + .fetch_new_messages(context, session, &watch_folder, folder_meaning, false) + .await?; + + info!(context, "fetch_new_messages returned {:?}", res); + + if res { + break; } } - Event::Interrupt => { - info!(context, "Fake IDLE interrupted"); + Ok(_) => { + info!(context, "Fake IDLE interrupted."); break; } } @@ -192,5 +139,6 @@ impl Imap { "IMAP-fake-IDLE done after {:.4}s", time_elapsed(&fake_idle_start_time).as_millis() as f64 / 1000., ); + Ok(()) } } diff --git a/src/imap/scan_folders.rs b/src/imap/scan_folders.rs index d5f76453b..1ad74e84e 100644 --- a/src/imap/scan_folders.rs +++ b/src/imap/scan_folders.rs @@ -4,14 +4,18 @@ use anyhow::{Context as _, Result}; use super::{get_folder_meaning_by_attrs, get_folder_meaning_by_name}; use crate::config::Config; -use crate::imap::Imap; +use crate::imap::{session::Session, Imap}; use crate::log::LogExt; use crate::tools::{self, time_elapsed}; use crate::{context::Context, imap::FolderMeaning}; impl Imap { /// Returns true if folders were scanned, false if scanning was postponed. - pub(crate) async fn scan_folders(&mut self, context: &Context) -> Result { + pub(crate) async fn scan_folders( + &mut self, + context: &Context, + session: &mut Session, + ) -> Result { // First of all, debounce to once per minute: let mut last_scan = context.last_full_folder_scan.lock().await; if let Some(last_scan) = *last_scan { @@ -26,8 +30,6 @@ impl Imap { } info!(context, "Starting full folder scan"); - self.prepare(context).await?; - let session = self.session.as_mut().context("No IMAP session")?; let folders = session.list_folders().await?; let watched_folders = get_watched_folders(context).await?; @@ -64,18 +66,16 @@ impl Imap { && folder_meaning != FolderMeaning::Drafts && folder_meaning != FolderMeaning::Trash { - let session = self.session.as_mut().context("no session")?; // Drain leftover unsolicited EXISTS messages session.server_sent_unsolicited_exists(context)?; loop { - self.fetch_move_delete(context, folder.name(), folder_meaning) + self.fetch_move_delete(context, session, folder.name(), folder_meaning) .await .context("Can't fetch new msgs in scanned folder") .log_err(context) .ok(); - let session = self.session.as_mut().context("no session")?; // If the server sent an unsocicited EXISTS during the fetch, we need to fetch again if !session.server_sent_unsolicited_exists(context)? { break; diff --git a/src/scheduler.rs b/src/scheduler.rs index 96047e36e..c4a065af8 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -19,7 +19,7 @@ use crate::context::Context; use crate::download::{download_msg, DownloadState}; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::events::EventType; -use crate::imap::{FolderMeaning, Imap}; +use crate::imap::{session::Session, FolderMeaning, Imap}; use crate::location; use crate::log::LogExt; use crate::message::MsgId; @@ -330,7 +330,7 @@ pub(crate) struct Scheduler { recently_seen_loop: RecentlySeenLoop, } -async fn download_msgs(context: &Context, imap: &mut Imap) -> Result<()> { +async fn download_msgs(context: &Context, session: &mut Session) -> Result<()> { let msg_ids = context .sql .query_map( @@ -349,7 +349,7 @@ async fn download_msgs(context: &Context, imap: &mut Imap) -> Result<()> { .await?; for msg_id in msg_ids { - if let Err(err) = download_msg(context, msg_id, imap).await { + if let Err(err) = download_msg(context, msg_id, session).await { warn!(context, "Failed to download message {msg_id}: {:#}.", err); // Update download state to failure @@ -392,94 +392,26 @@ async fn inbox_loop( return; }; + let mut old_session: Option = None; loop { - if let Err(err) = connection.prepare(&ctx).await { - warn!(ctx, "Failed to prepare connection: {:#}.", err); - } - - { - // Update quota no more than once a minute. - let quota_needs_update = { - let quota = ctx.quota.read().await; - quota - .as_ref() - .filter(|quota| time_elapsed("a.modified) > Duration::from_secs(60)) - .is_none() - }; - - if quota_needs_update { - if let Some(session) = connection.session.as_mut() { - if let Err(err) = ctx.update_recent_quota(session).await { - warn!(ctx, "Failed to update quota: {:#}.", err); - } + let session = if let Some(session) = old_session.take() { + session + } else { + match connection.prepare(&ctx).await { + Err(err) => { + warn!(ctx, "Failed to prepare connection: {:#}.", err); + continue; } - } - } - - let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed); - if resync_requested { - if let Some(session) = connection.session.as_mut() { - if let Err(err) = session.resync_folders(&ctx).await { - warn!(ctx, "Failed to resync folders: {:#}.", err); - ctx.resync_request.store(true, Ordering::Relaxed); - } - } - } - - maybe_add_time_based_warnings(&ctx).await; - - match ctx.get_config_i64(Config::LastHousekeeping).await { - Ok(last_housekeeping_time) => { - let next_housekeeping_time = - last_housekeeping_time.saturating_add(60 * 60 * 24); - if next_housekeeping_time <= time() { - sql::housekeeping(&ctx).await.log_err(&ctx).ok(); - } - } - Err(err) => { - warn!(ctx, "Failed to get last housekeeping time: {}", err); + Ok(session) => session, } }; - match ctx.get_config_bool(Config::FetchedExistingMsgs).await { - Ok(fetched_existing_msgs) => { - if !fetched_existing_msgs { - // Consider it done even if we fail. - // - // This operation is not critical enough to retry, - // especially if the error is persistent. - if let Err(err) = ctx - .set_config_internal( - Config::FetchedExistingMsgs, - config::from_bool(true), - ) - .await - { - warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err); - } - - if let Err(err) = connection.fetch_existing_msgs(&ctx).await { - warn!(ctx, "Failed to fetch existing messages: {:#}", err); - connection.trigger_reconnect(&ctx); - } - } - } - Err(err) => { - warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err); + match inbox_fetch_idle(&ctx, &mut connection, session).await { + Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"), + Ok(session) => { + old_session = Some(session); } } - - if let Err(err) = download_msgs(&ctx, &mut connection).await { - warn!(ctx, "Failed to download messages: {:#}", err); - } - - if let Some(session) = connection.session.as_mut() { - if let Err(err) = session.fetch_metadata(&ctx).await { - warn!(ctx, "Failed to fetch metadata: {err:#}."); - } - } - - fetch_idle(&ctx, &mut connection, FolderMeaning::Inbox).await; } }; @@ -525,82 +457,123 @@ pub async fn convert_folder_meaning( Ok((folder_config, watch_folder)) } -/// Implement a single iteration of IMAP loop. -/// -/// This function performs all IMAP operations on a single folder, selecting it if necessary and -/// handling all the errors. In case of an error, it is logged, but not propagated upwards. If -/// critical operation fails such as fetching new messages fails, connection is reset via -/// `trigger_reconnect`, so a fresh one can be opened. -async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: FolderMeaning) { - let create_mvbox = true; - if let Err(err) = connection - .ensure_configured_folders(ctx, create_mvbox) - .await - { - warn!( - ctx, - "Cannot watch {folder_meaning}, ensure_configured_folders() failed: {:#}", err, - ); - connection.idle_interrupt_receiver.recv().await.ok(); - return; +async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) -> Result { + // Update quota no more than once a minute. + let quota_needs_update = { + let quota = ctx.quota.read().await; + quota + .as_ref() + .filter(|quota| time_elapsed("a.modified) > Duration::from_secs(60)) + .is_none() + }; + if quota_needs_update { + if let Err(err) = ctx.update_recent_quota(&mut session).await { + warn!(ctx, "Failed to update quota: {:#}.", err); + } } - let (folder_config, watch_folder) = match convert_folder_meaning(ctx, folder_meaning).await { - Ok(meaning) => meaning, - Err(error) => { - // Warning instead of error because the folder may not be configured. - // For example, this happens if the server does not have Sent folder - // but watching Sent folder is enabled. - warn!(ctx, "Error converting IMAP Folder name: {:?}", error); - connection.connectivity.set_not_configured(ctx).await; - connection.idle_interrupt_receiver.recv().await.ok(); - return; + + let resync_requested = ctx.resync_request.swap(false, Ordering::Relaxed); + if resync_requested { + if let Err(err) = session.resync_folders(ctx).await { + warn!(ctx, "Failed to resync folders: {:#}.", err); + ctx.resync_request.store(true, Ordering::Relaxed); + } + } + + maybe_add_time_based_warnings(ctx).await; + + match ctx.get_config_i64(Config::LastHousekeeping).await { + Ok(last_housekeeping_time) => { + let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24); + if next_housekeeping_time <= time() { + sql::housekeeping(ctx).await.log_err(ctx).ok(); + } + } + Err(err) => { + warn!(ctx, "Failed to get last housekeeping time: {}", err); } }; - // connect and fake idle if unable to connect - if let Err(err) = connection - .prepare(ctx) - .await - .context("prepare IMAP connection") - { - warn!(ctx, "{:#}", err); - connection.trigger_reconnect(ctx); - return; - } + match ctx.get_config_bool(Config::FetchedExistingMsgs).await { + Ok(fetched_existing_msgs) => { + if !fetched_existing_msgs { + // Consider it done even if we fail. + // + // This operation is not critical enough to retry, + // especially if the error is persistent. + if let Err(err) = ctx + .set_config_internal(Config::FetchedExistingMsgs, config::from_bool(true)) + .await + { + warn!(ctx, "Can't set Config::FetchedExistingMsgs: {:#}", err); + } - if folder_config == Config::ConfiguredInboxFolder { - if let Some(session) = connection.session.as_mut() { - session - .store_seen_flags_on_imap(ctx) - .await - .context("store_seen_flags_on_imap") - .log_err(ctx) - .ok(); - } else { - warn!(ctx, "No session even though we just prepared it"); + if let Err(err) = imap.fetch_existing_msgs(ctx, &mut session).await { + warn!(ctx, "Failed to fetch existing messages: {:#}", err); + } + } + } + Err(err) => { + warn!(ctx, "Can't get Config::FetchedExistingMsgs: {:#}", err); } } - // Fetch the watched folder. - if let Err(err) = connection - .fetch_move_delete(ctx, &watch_folder, folder_meaning) + download_msgs(ctx, &mut session) .await - .context("fetch_move_delete") - { - connection.trigger_reconnect(ctx); - warn!(ctx, "{:#}", err); - return; + .context("Failed to download messages")?; + session + .fetch_metadata(ctx) + .await + .context("Failed to fetch metadata")?; + + let session = fetch_idle(ctx, imap, session, FolderMeaning::Inbox).await?; + Ok(session) +} + +/// Implement a single iteration of IMAP loop. +/// +/// This function performs all IMAP operations on a single folder, selecting it if necessary and +/// handling all the errors. In case of an error, an error is returned and connection is dropped, +/// otherwise connection is returned. +async fn fetch_idle( + ctx: &Context, + connection: &mut Imap, + mut session: Session, + folder_meaning: FolderMeaning, +) -> Result { + let (folder_config, watch_folder) = match convert_folder_meaning(ctx, folder_meaning).await { + Ok(meaning) => meaning, + Err(err) => { + // Warning instead of error because the folder may not be configured. + // For example, this happens if the server does not have Sent folder + // but watching Sent folder is enabled. + warn!(ctx, "Error converting IMAP Folder name: {err:#}."); + connection.connectivity.set_not_configured(ctx).await; + connection.idle_interrupt_receiver.recv().await.ok(); + return Err(err); + } + }; + + if folder_config == Config::ConfiguredInboxFolder { + session + .store_seen_flags_on_imap(ctx) + .await + .context("store_seen_flags_on_imap")?; } + // Fetch the watched folder. + connection + .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning) + .await + .context("fetch_move_delete")?; + // Mark expired messages for deletion. Marked messages will be deleted from the server // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not // called right before `fetch_move_delete` because it is not well optimized and would // otherwise slow down message fetching. delete_expired_imap_messages(ctx) .await - .context("delete_expired_imap_messages") - .log_err(ctx) - .ok(); + .context("delete_expired_imap_messages")?; // Scan additional folders only after finishing fetching the watched folder. // @@ -608,7 +581,11 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder // be able to scan all folders before time is up if there are many of them. if folder_config == Config::ConfiguredInboxFolder { // Only scan on the Inbox thread in order to prevent parallel scans, which might lead to duplicate messages - match connection.scan_folders(ctx).await.context("scan_folders") { + match connection + .scan_folders(ctx, &mut session) + .await + .context("scan_folders") + { Err(err) => { // Don't reconnect, if there is a problem with the connection we will realize this when IDLEing // but maybe just one folder can't be selected or something @@ -621,42 +598,26 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder // In most cases this will select the watched folder and return because there are // no new messages. We want to select the watched folder anyway before going IDLE // there, so this does not take additional protocol round-trip. - if let Err(err) = connection - .fetch_move_delete(ctx, &watch_folder, folder_meaning) + connection + .fetch_move_delete(ctx, &mut session, &watch_folder, folder_meaning) .await - .context("fetch_move_delete after scan_folders") - { - connection.trigger_reconnect(ctx); - warn!(ctx, "{:#}", err); - return; - } + .context("fetch_move_delete after scan_folders")?; } Ok(false) => {} } } // Synchronize Seen flags. - if let Some(session) = connection.session.as_mut() { - session - .sync_seen_flags(ctx, &watch_folder) - .await - .context("sync_seen_flags") - .log_err(ctx) - .ok(); - } else { - warn!(ctx, "No IMAP session, skipping flag synchronization."); - } + session + .sync_seen_flags(ctx, &watch_folder) + .await + .context("sync_seen_flags") + .log_err(ctx) + .ok(); connection.connectivity.set_idle(ctx).await; ctx.emit_event(EventType::ImapInboxIdle); - let Some(session) = connection.session.take() else { - warn!(ctx, "No IMAP session, going to fake idle."); - connection - .fake_idle(ctx, watch_folder, folder_meaning) - .await; - return; - }; if !session.can_idle() { info!( @@ -664,9 +625,9 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder "IMAP session does not support IDLE, going to fake idle." ); connection - .fake_idle(ctx, watch_folder, folder_meaning) - .await; - return; + .fake_idle(ctx, &mut session, watch_folder, folder_meaning) + .await?; + return Ok(session); } if ctx @@ -678,29 +639,22 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_meaning: Folder { info!(ctx, "IMAP IDLE is disabled, going to fake idle."); connection - .fake_idle(ctx, watch_folder, folder_meaning) - .await; - return; + .fake_idle(ctx, &mut session, watch_folder, folder_meaning) + .await?; + return Ok(session); } info!(ctx, "IMAP session supports IDLE, using it."); - match session + let session = session .idle( ctx, connection.idle_interrupt_receiver.clone(), &watch_folder, ) .await - .context("idle") - { - Ok(session) => { - connection.session = Some(session); - } - Err(err) => { - connection.trigger_reconnect(ctx); - warn!(ctx, "{:#}", err); - } - } + .context("idle")?; + + Ok(session) } async fn simple_imap_loop( @@ -726,8 +680,26 @@ async fn simple_imap_loop( return; } + let mut old_session: Option = None; loop { - fetch_idle(&ctx, &mut connection, folder_meaning).await; + let session = if let Some(session) = old_session.take() { + session + } else { + match connection.prepare(&ctx).await { + Err(err) => { + warn!(ctx, "Failed to prepare connection: {:#}.", err); + continue; + } + Ok(session) => session, + } + }; + + match fetch_idle(&ctx, &mut connection, session, folder_meaning).await { + Err(err) => warn!(ctx, "Failed fetch_idle: {err:#}"), + Ok(session) => { + old_session = Some(session); + } + } } };