From f11fceb63a7d001fcca0f8bf6b10592e4be921fe Mon Sep 17 00:00:00 2001 From: link2xt Date: Mon, 5 Dec 2022 10:39:59 +0000 Subject: [PATCH] Move IMAP session state into imap::session::Session IMAP capabilities and selected folder are IMAP session, not IMAP client property. Moving most operations into IMAP session structure removes the need to constantly check whether IMAP session exists and reduces number of invalid states, e.g. when a folder is selected but there is no connection. Capabilities are determined immediately after logging in, so there is no need for `capabilities_determined` flag anymore. Capabilities of the server are always known if there is a session. `should_reconnect` flag and `disconnect()` function are removed: we drop the session on error. Even though RFC 3501 says that a client SHOULD NOT close the connection without a LOGOUT, it is more reliable to always just drop the connection, especially after an error. --- CHANGELOG.md | 4 + src/configure.rs | 3 - src/imap.rs | 280 ++++++++++++-------------------------- src/imap/capabilities.rs | 26 ++++ src/imap/client.rs | 43 +++++- src/imap/idle.rs | 175 ++++++++++++------------ src/imap/scan_folders.rs | 6 +- src/imap/select_folder.rs | 112 ++++++--------- src/imap/session.rs | 43 +++++- src/quota.rs | 10 +- src/scheduler.rs | 81 +++++++---- 11 files changed, 385 insertions(+), 398 deletions(-) create mode 100644 src/imap/capabilities.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f3d360afa..930e0634b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ - Log the reason when the message cannot be sent to the chat #3810 - Add IMAP server ID line to the context info only when it is known #3814 - Remove autogenerated typescript files #3815 +- Move functions that require an IMAP session from `Imap` to `Session` + to reduce the number of code paths where IMAP session may not exist. + Drop connection on error instead of trying to disconnect, + potentially preventing IMAP task from getting stuck. #3812 ### API-Changes - Add Python API to send reactions #3762 diff --git a/src/configure.rs b/src/configure.rs index 42d35cdc8..63a4dec1a 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -442,9 +442,6 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { let create_mvbox = ctx.should_watch_mvbox().await?; - // Send client ID as soon as possible before doing anything else. - imap.determine_capabilities(ctx).await?; - imap.configure_folders(ctx, create_mvbox).await?; imap.select_with_uidvalidity(ctx, "INBOX") diff --git a/src/imap.rs b/src/imap.rs index 139853b86..2f22fe4a0 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -13,9 +13,7 @@ use std::{ use anyhow::{bail, format_err, Context as _, Result}; use async_channel::Receiver; -use async_imap::types::{ - Fetch, Flag, Mailbox, Name, NameAttribute, Quota, QuotaRoot, UnsolicitedResponse, -}; +use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse}; use futures::StreamExt; use num_traits::FromPrimitive; @@ -45,11 +43,12 @@ use crate::sql; use crate::stock_str; use crate::tools::create_id; +pub(crate) mod capabilities; mod client; mod idle; pub mod scan_folders; pub mod select_folder; -mod session; +pub(crate) mod session; use client::Client; use mailparse::SingleInfo; @@ -90,16 +89,11 @@ const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])"; #[derive(Debug)] pub struct Imap { - idle_interrupt: Receiver, + pub(crate) idle_interrupt_receiver: Receiver, config: ImapConfig, - session: Option, - should_reconnect: bool, + pub(crate) session: Option, login_failed_once: bool, - /// True if CAPABILITY command was run successfully once and config.can_* contain correct - /// values. - capabilities_determined: bool, - pub(crate) connectivity: ConnectivityStore, } @@ -156,23 +150,6 @@ struct ImapConfig { pub lp: ServerLoginParam, pub socks5_config: Option, pub strict_tls: bool, - pub selected_folder: Option, - pub selected_mailbox: Option, - pub selected_folder_needs_expunge: bool, - - pub can_idle: bool, - - /// True if the server has MOVE capability as defined in - /// - pub can_move: bool, - - /// True if the server has QUOTA capability as defined in - /// - pub can_check_quota: bool, - - /// True if the server has CONDSTORE capability as defined in - /// - pub can_condstore: bool, } struct UidGrouper> { @@ -245,7 +222,7 @@ impl Imap { socks5_config: Option, addr: &str, provider_strict_tls: bool, - idle_interrupt: Receiver, + idle_interrupt_receiver: Receiver, ) -> Result { if lp.server.is_empty() || lp.user.is_empty() || lp.password.is_empty() { bail!("Incomplete IMAP connection parameters"); @@ -262,23 +239,14 @@ impl Imap { lp: lp.clone(), socks5_config, strict_tls, - selected_folder: None, - selected_mailbox: None, - selected_folder_needs_expunge: false, - can_idle: false, - can_move: false, - can_check_quota: false, - can_condstore: false, }; let imap = Imap { - idle_interrupt, + idle_interrupt_receiver, config, session: None, - should_reconnect: false, login_failed_once: false, connectivity: Default::default(), - capabilities_determined: false, }; Ok(imap) @@ -287,7 +255,7 @@ impl Imap { /// Creates new disconnected IMAP client using configured parameters. pub async fn new_configured( context: &Context, - idle_interrupt: Receiver, + idle_interrupt_receiver: Receiver, ) -> Result { if !context.is_configured().await? { bail!("IMAP Connect without configured params"); @@ -305,7 +273,7 @@ impl Imap { .map_or(param.socks5_config.is_some(), |provider| { provider.strict_tls }), - idle_interrupt, + idle_interrupt_receiver, )?; Ok(imap) } @@ -322,10 +290,7 @@ impl Imap { bail!("IMAP operation attempted while it is torn down"); } - if self.should_reconnect() { - self.disconnect(context).await; - self.should_reconnect = false; - } else if self.session.is_some() { + if self.session.is_some() { return Ok(()); } @@ -404,11 +369,12 @@ impl Imap { client.login(imap_user, imap_pw).await }; - self.should_reconnect = false; - match login_res { Ok(session) => { - // needs to be set here to ensure it is set on reconnects. + // Store server ID in the context to display in account info. + let mut lock = context.server_id.write().await; + *lock = session.capabilities.server_id.clone(); + self.session = Some(session); self.login_failed_once = false; context.emit_event(EventType::ImapConnected(format!( @@ -446,41 +412,11 @@ impl Imap { self.login_failed_once = true; } - self.trigger_reconnect(context).await; Err(format_err!("{}\n\n{}", message, err)) } } } - /// Determine server capabilities if not done yet. - /// - /// If server supports ID capability, send our client ID. - pub(crate) async fn determine_capabilities(&mut self, context: &Context) -> Result<()> { - if self.capabilities_determined { - return Ok(()); - } - - let session = self.session.as_mut().context( - "Can't determine server capabilities because connection was not established", - )?; - let caps = session - .capabilities() - .await - .context("CAPABILITY command error")?; - if caps.has_str("ID") { - let server_id = session.id([("name", Some("Delta Chat"))]).await?; - info!(context, "Server ID: {:?}", server_id); - let mut lock = context.server_id.write().await; - *lock = server_id; - } - self.config.can_idle = caps.has_str("IDLE"); - self.config.can_move = caps.has_str("MOVE"); - self.config.can_check_quota = caps.has_str("QUOTA"); - self.config.can_condstore = caps.has_str("CONDSTORE"); - self.capabilities_determined = true; - Ok(()) - } - /// Prepare for IMAP operation. /// /// Ensure that IMAP client is connected, folders are created and IMAP capabilities are @@ -491,37 +427,16 @@ impl Imap { return Err(err); } - self.determine_capabilities(context).await?; self.ensure_configured_folders(context, true).await?; Ok(()) } - async fn disconnect(&mut self, context: &Context) { - info!(context, "disconnecting"); - - // Close folder if messages should be expunged - if let Err(err) = self.close_folder(context).await { - warn!(context, "failed to close folder: {:?}", err); - } - - // Logout from the server - if let Some(mut session) = self.session.take() { - if let Err(err) = session.logout().await { - warn!(context, "failed to logout: {:?}", err); - } - } - self.capabilities_determined = false; - self.config.selected_folder = None; - self.config.selected_mailbox = None; - } - - pub fn should_reconnect(&self) -> bool { - self.should_reconnect - } - - pub async fn trigger_reconnect(&mut self, context: &Context) { - self.connectivity.set_connecting(context).await; - self.should_reconnect = true; + /// Drops the session without disconnecting properly. + /// Useful in case of an IMAP error, when it's unclear if it's in a correct state and it's + /// easier to setup a new connection. + pub fn trigger_reconnect(&mut self, context: &Context) { + info!(context, "Dropping an IMAP connection."); + self.session = None; } /// FETCH-MOVE-DELETE iteration. @@ -552,7 +467,12 @@ impl Imap { context.interrupt_ephemeral_task().await; } - self.move_delete_messages(context, watch_folder) + let session = self + .session + .as_mut() + .context("no IMAP connection established")?; + session + .move_delete_messages(context, watch_folder) .await .context("move_delete_messages")?; @@ -573,13 +493,13 @@ impl Imap { // Collect pairs of UID and Message-ID. let mut msg_ids = BTreeMap::new(); - self.select_folder(context, Some(&folder)).await?; - let session = self .session .as_mut() .context("IMAP No connection established")?; + session.select_folder(context, Some(&folder)).await?; + let mut list = session .uid_fetch("1:*", RFC724MID_UID) .await @@ -638,10 +558,9 @@ impl Imap { context: &Context, folder: &str, ) -> Result { - let newly_selected = self.select_or_create_folder(context, folder).await?; - - let mailbox = self - .config + let session = self.session.as_mut().context("no session")?; + let newly_selected = session.select_or_create_folder(context, folder).await?; + let mailbox = session .selected_mailbox .as_mut() .with_context(|| format!("No mailbox selected, folder: {}", folder))?; @@ -701,12 +620,12 @@ impl Imap { context, "IMAP folder has no uid_next, fall back to fetching" ); - let session = self.session.as_mut().context("Get uid_next: Nosession")?; // note that we use fetch by sequence number // and thus we only need to get exactly the // last-index message. let set = format!("{}", mailbox.exists); let mut list = session + .inner .fetch(set, JUST_UID) .await .context("Error fetching UID")?; @@ -947,7 +866,9 @@ impl Imap { .await?; Ok(()) } +} +impl Session { /// Deletes batch of messages identified by their UID from the currently /// selected folder. async fn delete_message_batch( @@ -975,7 +896,6 @@ impl Imap { "IMAP messages {} marked as deleted", uid_set ))); - self.config.selected_folder_needs_expunge = true; Ok(()) } @@ -988,12 +908,8 @@ impl Imap { row_ids: Vec, target: &str, ) -> Result<()> { - if self.config.can_move { - let session = self - .session - .as_mut() - .context("no session while attempting to MOVE messages")?; - match session.uid_mv(set, &target).await { + if self.can_move() { + match self.uid_mv(set, &target).await { Ok(()) => { // Messages are moved or don't exist, IMAP returns OK response in both cases. context @@ -1032,11 +948,7 @@ impl Imap { // Server does not support MOVE or MOVE failed. // Copy the message to the destination folder and mark the record for deletion. - let session = self - .session - .as_mut() - .context("no session while attempting to COPY messages")?; - match session.uid_copy(&set, &target).await { + match self.uid_copy(&set, &target).await { Ok(()) => { context .sql @@ -1081,8 +993,6 @@ impl Imap { ) .await?; - self.prepare(context).await?; - for (target, rowid_set, uid_set) in UidGrouper::from(rows) { // Select folder inside the loop to avoid selecting it if there are no pending // MOVE/DELETE operations. This does not result in multiple SELECT commands @@ -1118,8 +1028,6 @@ impl Imap { /// Stores pending `\Seen` flags for messages in `imap_markseen` table. pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> { - self.prepare(context).await?; - let rows = context .sql .query_map( @@ -1171,10 +1079,17 @@ impl Imap { Ok(()) } +} +impl Imap { /// Synchronizes `\Seen` flags using `CONDSTORE` extension. pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> { - if !self.config.can_condstore { + let session = self + .session + .as_mut() + .with_context(|| format!("No IMAP connection established, folder: {}", folder))?; + + if !session.can_condstore() { info!( context, "Server does not support CONDSTORE, skipping flag synchronization." @@ -1182,16 +1097,12 @@ impl Imap { return Ok(()); } - self.select_folder(context, Some(folder)) + session + .select_folder(context, Some(folder)) .await .context("failed to select folder")?; - let session = self - .session - .as_mut() - .with_context(|| format!("No IMAP connection established, folder: {}", folder))?; - let mailbox = self - .config + let mailbox = session .selected_mailbox .as_ref() .with_context(|| format!("No mailbox selected, folder: {}", folder))?; @@ -1334,15 +1245,11 @@ impl Imap { /// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages) async fn prefetch_existing_msgs(&mut self) -> Result> { + let session = self.session.as_mut().context("no IMAP session")?; let exists: i64 = { - let mailbox = self - .config - .selected_mailbox - .as_ref() - .context("no mailbox")?; + let mailbox = session.selected_mailbox.as_ref().context("no mailbox")?; mailbox.exists.into() }; - let session = self.session.as_mut().context("no IMAP session")?; // Fetch last DC_FETCH_EXISTING_MSGS_COUNT (100) messages. // Sequence numbers are sequential. If there are 1000 messages in the inbox, @@ -1394,7 +1301,7 @@ impl Imap { if fetch_partially { "partial" } else { "full" }, set ); - let mut fetch_responses = match session + let mut fetch_responses = session .uid_fetch( &set, if fetch_partially { @@ -1404,17 +1311,9 @@ impl Imap { }, ) .await - .with_context(|| format!("fetching messages {} from folder \"{}\"", &set, folder)) - { - Ok(fetch_responses) => fetch_responses, - Err(err) => { - // We want to reconnect regardless of whether it's an I/O error or parsing - // error. If the protocol parser ends up in incorrect state because of some - // incompatiblity with a server, reset may help. - self.should_reconnect = true; - return Err(err); - } - }; + .with_context(|| { + format!("fetching messages {} from folder \"{}\"", &set, folder) + })?; // Map from UIDs to unprocessed FETCH results. We put unprocessed FETCH results here // when we want to process other messages first. @@ -1563,20 +1462,20 @@ impl Imap { Ok((last_uid, received_msgs)) } +} +impl Session { /// Returns success if we successfully set the flag or we otherwise /// think add_flag should not be retried: Disconnection during setting /// the flag, or other imap-errors, returns true as well. /// /// Returning error means that the operation can be retried. async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> { - if self.should_reconnect() { - bail!("Can't set flag, should reconnect"); + if flag == "\\Deleted" { + self.selected_folder_needs_expunge = true; } - - let session = self.session.as_mut().context("No session")?; let query = format!("+FLAGS ({})", flag); - let mut responses = session + let mut responses = self .uid_store(uid_set, &query) .await .with_context(|| format!("IMAP failed to store: ({}, {})", uid_set, query))?; @@ -1585,7 +1484,9 @@ impl Imap { } Ok(()) } +} +impl Imap { pub(crate) async fn prepare_imap_operation_on_msg( &mut self, context: &Context, @@ -1595,32 +1496,35 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::RetryLater); } - if self.session.is_none() { - // currently jobs are only performed on the INBOX thread - // TODO: make INBOX/SENT/MVBOX perform the jobs on their - // respective folders to avoid select_folder network traffic - // and the involved error states - if let Err(err) = self.prepare(context).await { - warn!(context, "prepare_imap_op failed: {}", err); - return Some(ImapActionResult::RetryLater); - } + if let Err(err) = self.prepare(context).await { + warn!(context, "prepare_imap_op failed: {}", err); + return Some(ImapActionResult::RetryLater); } - match self.select_folder(context, Some(folder)).await { + + let session = match self + .session + .as_mut() + .context("no IMAP connection established") + { + Err(err) => { + error!(context, "Failed to prepare IMAP operation: {:#}", err); + return Some(ImapActionResult::Failed); + } + Ok(session) => session, + }; + + match session.select_folder(context, Some(folder)).await { Ok(_) => None, Err(select_folder::Error::ConnectionLost) => { warn!(context, "Lost imap connection"); Some(ImapActionResult::RetryLater) } - Err(select_folder::Error::NoSession) => { - warn!(context, "no imap session"); - Some(ImapActionResult::Failed) - } Err(select_folder::Error::BadFolderName(folder_name)) => { warn!(context, "invalid folder name: {:?}", folder_name); Some(ImapActionResult::Failed) } Err(err) => { - warn!(context, "failed to select folder: {:?}: {:?}", folder, err); + warn!(context, "failed to select folder {:?}: {:#}", folder, err); Some(ImapActionResult::RetryLater) } } @@ -1653,15 +1557,15 @@ impl Imap { folders: &[&'a str], create_mvbox: bool, ) -> Result> { - // Close currently selected folder if needed. - // We are going to select folders using low-level EXAMINE operations below. - self.select_folder(context, None).await?; - let session = self .session .as_mut() .context("no IMAP connection established")?; + // Close currently selected folder if needed. + // We are going to select folders using low-level EXAMINE operations below. + session.select_folder(context, None).await?; + for folder in folders { info!(context, "Looking for MVBOX-folder \"{}\"...", &folder); let res = session.examine(&folder).await; @@ -1760,15 +1664,16 @@ impl Imap { info!(context, "FINISHED configuring IMAP-folders."); Ok(()) } +} +impl Session { /// Return whether the server sent an unsolicited EXISTS response. /// Drains all responses from `session.unsolicited_responses` in the process. /// If this returns `true`, this means that new emails arrived and you should /// fetch again, even if you just fetched. fn server_sent_unsolicited_exists(&self, context: &Context) -> Result { - let session = self.session.as_ref().context("no session")?; let mut unsolicited_exists = false; - while let Ok(response) = session.unsolicited_responses.try_recv() { + while let Ok(response) = self.unsolicited_responses.try_recv() { match response { UnsolicitedResponse::Exists(_) => { info!( @@ -1782,19 +1687,6 @@ impl Imap { } Ok(unsolicited_exists) } - - pub fn can_check_quota(&self) -> bool { - self.config.can_check_quota - } - - pub(crate) async fn get_quota_roots( - &mut self, - mailbox_name: &str, - ) -> Result<(Vec, Vec)> { - let session = self.session.as_mut().context("no session")?; - let quota_roots = session.get_quota_root(mailbox_name).await?; - Ok(quota_roots) - } } async fn should_move_out_of_spam( diff --git a/src/imap/capabilities.rs b/src/imap/capabilities.rs new file mode 100644 index 000000000..14385c088 --- /dev/null +++ b/src/imap/capabilities.rs @@ -0,0 +1,26 @@ +//! # IMAP capabilities +//! +//! IMAP server capabilities are determined with a `CAPABILITY` command. +use std::collections::HashMap; + +#[derive(Debug)] +pub(crate) struct Capabilities { + /// True if the server has IDLE capability as defined in + /// + pub can_idle: bool, + + /// True if the server has MOVE capability as defined in + /// + pub can_move: bool, + + /// True if the server has QUOTA capability as defined in + /// + pub can_check_quota: bool, + + /// True if the server has CONDSTORE capability as defined in + /// + pub can_condstore: bool, + + /// Server ID if the server supports ID capability. + pub server_id: Option>, +} diff --git a/src/imap/client.rs b/src/imap/client.rs index 0e405a60d..807ebf3ff 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -6,10 +6,12 @@ use std::{ use anyhow::{Context as _, Result}; use async_imap::Client as ImapClient; +use async_imap::Session as ImapSession; use async_smtp::ServerAddress; use tokio::net::{self, TcpStream}; +use super::capabilities::Capabilities; use super::session::Session; use crate::login_param::{build_tls, Socks5Config}; @@ -38,27 +40,54 @@ impl DerefMut for Client { } } +/// Determine server capabilities. +/// +/// If server supports ID capability, send our client ID. +async fn determine_capabilities( + session: &mut ImapSession>, +) -> Result { + let caps = session + .capabilities() + .await + .context("CAPABILITY command error")?; + let server_id = if caps.has_str("ID") { + session.id([("name", Some("Delta Chat"))]).await? + } else { + None + }; + let capabilities = Capabilities { + can_idle: caps.has_str("IDLE"), + can_move: caps.has_str("MOVE"), + can_check_quota: caps.has_str("QUOTA"), + can_condstore: caps.has_str("CONDSTORE"), + server_id, + }; + Ok(capabilities) +} + impl Client { - pub async fn login(self, username: &str, password: &str) -> Result { + pub(crate) async fn login(self, username: &str, password: &str) -> Result { let Client { inner, .. } = self; - let session = inner + let mut session = inner .login(username, password) .await .map_err(|(err, _client)| err)?; - Ok(Session { inner: session }) + let capabilities = determine_capabilities(&mut session).await?; + Ok(Session::new(session, capabilities)) } - pub async fn authenticate( + pub(crate) async fn authenticate( self, auth_type: &str, authenticator: impl async_imap::Authenticator, ) -> Result { let Client { inner, .. } = self; - let session = inner + let mut session = inner .authenticate(auth_type, authenticator) .await .map_err(|(err, _client)| err)?; - Ok(Session { inner: session }) + let capabilities = determine_capabilities(&mut session).await?; + Ok(Session::new(session, capabilities)) } pub async fn connect_secure( @@ -146,7 +175,7 @@ impl Client { }) } - pub async fn secure(self, domain: &str, strict_tls: bool) -> Result { + pub async fn secure(self, domain: &str, strict_tls: bool) -> Result { if self.is_secure { Ok(self) } else { diff --git a/src/imap/idle.rs b/src/imap/idle.rs index f6af1cded..61821a82e 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -1,113 +1,106 @@ use super::Imap; use anyhow::{bail, Context as _, Result}; +use async_channel::Receiver; use async_imap::extensions::idle::IdleResponse; use futures_lite::FutureExt; use std::time::{Duration, SystemTime}; +use super::session::Session; use crate::{context::Context, scheduler::InterruptInfo}; -use super::session::Session; - -impl Imap { - pub fn can_idle(&self) -> bool { - self.config.can_idle - } - +impl Session { pub async fn idle( - &mut self, + mut self, context: &Context, + idle_interrupt_receiver: Receiver, watch_folder: Option, - ) -> Result { + ) -> Result<(Self, InterruptInfo)> { use futures::future::FutureExt; if !self.can_idle() { bail!("IMAP server does not have IDLE capability"); } - self.prepare(context).await?; - - self.select_folder(context, watch_folder.as_deref()).await?; let timeout = Duration::from_secs(23 * 60); let mut info = Default::default(); + self.select_folder(context, watch_folder.as_deref()).await?; + if self.server_sent_unsolicited_exists(context)? { - return Ok(info); + return Ok((self, info)); } - if let Some(session) = self.session.take() { - if let Ok(info) = self.idle_interrupt.try_recv() { - info!(context, "skip idle, got interrupt {:?}", info); - self.session = Some(session); - return Ok(info); - } - - let mut handle = session.idle(); - if let Err(err) = handle.init().await { - bail!("IMAP IDLE protocol failed to init/complete: {}", err); - } - - let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - - enum Event { - IdleResponse(IdleResponse), - Interrupt(InterruptInfo), - } - - let folder_name = watch_folder.as_deref().unwrap_or("None"); - info!( - context, - "{}: Idle entering wait-on-remote state", folder_name - ); - let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async { - let info = self.idle_interrupt.recv().await; - - // cancel imap idle connection properly - drop(interrupt); - - Ok(Event::Interrupt(info.unwrap_or_default())) - }); - - match fut.await { - Ok(Event::IdleResponse(IdleResponse::NewData(x))) => { - info!(context, "{}: Idle has NewData {:?}", folder_name, x); - } - Ok(Event::IdleResponse(IdleResponse::Timeout)) => { - info!( - context, - "{}: Idle-wait timeout or interruption", folder_name - ); - } - Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { - info!( - context, - "{}: Idle wait was interrupted manually", folder_name - ); - } - Ok(Event::Interrupt(i)) => { - info!( - context, - "{}: Idle wait was interrupted: {:?}", folder_name, &i - ); - info = i; - } - Err(err) => { - warn!(context, "{}: Idle wait errored: {:?}", folder_name, err); - } - } - - let session = tokio::time::timeout(Duration::from_secs(15), handle.done()) - .await - .with_context(|| format!("{}: IMAP IDLE protocol timed out", folder_name))? - .with_context(|| format!("{}: IMAP IDLE failed", folder_name))?; - self.session = Some(Session { inner: session }); - } else { - warn!(context, "Attempted to idle without a session"); + if let Ok(info) = idle_interrupt_receiver.try_recv() { + info!(context, "skip idle, got interrupt {:?}", info); + return Ok((self, info)); } - Ok(info) + let mut handle = self.inner.idle(); + if let Err(err) = handle.init().await { + bail!("IMAP IDLE protocol failed to init/complete: {}", err); + } + + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + + enum Event { + IdleResponse(IdleResponse), + Interrupt(InterruptInfo), + } + + let folder_name = watch_folder.as_deref().unwrap_or("None"); + info!( + context, + "{}: Idle entering wait-on-remote state", folder_name + ); + let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async { + let info = idle_interrupt_receiver.recv().await; + + // cancel imap idle connection properly + drop(interrupt); + + Ok(Event::Interrupt(info.unwrap_or_default())) + }); + + match fut.await { + Ok(Event::IdleResponse(IdleResponse::NewData(x))) => { + info!(context, "{}: Idle has NewData {:?}", folder_name, x); + } + Ok(Event::IdleResponse(IdleResponse::Timeout)) => { + info!( + context, + "{}: Idle-wait timeout or interruption", folder_name + ); + } + Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { + info!( + context, + "{}: Idle wait was interrupted manually", folder_name + ); + } + Ok(Event::Interrupt(i)) => { + info!( + context, + "{}: Idle wait was interrupted: {:?}", folder_name, &i + ); + info = i; + } + Err(err) => { + warn!(context, "{}: Idle wait errored: {:?}", folder_name, err); + } + } + + let session = tokio::time::timeout(Duration::from_secs(15), handle.done()) + .await + .with_context(|| format!("{}: IMAP IDLE protocol timed out", folder_name))? + .with_context(|| format!("{}: IMAP IDLE failed", folder_name))?; + self.inner = session; + + Ok((self, info)) } +} +impl Imap { pub(crate) async fn fake_idle( &mut self, context: &Context, @@ -123,7 +116,11 @@ impl Imap { watch_folder } else { info!(context, "IMAP-fake-IDLE: no folder, waiting for interrupt"); - return self.idle_interrupt.recv().await.unwrap_or_default(); + return self + .idle_interrupt_receiver + .recv() + .await + .unwrap_or_default(); }; info!(context, "IMAP-fake-IDLEing folder={:?}", watch_folder); @@ -142,7 +139,7 @@ impl Imap { .tick() .map(|_| Event::Tick) .race( - self.idle_interrupt + self.idle_interrupt_receiver .recv() .map(|probe_network| Event::Interrupt(probe_network.unwrap_or_default())), ) @@ -156,9 +153,11 @@ impl Imap { warn!(context, "fake_idle: could not connect: {}", err); continue; } - if self.config.can_idle { - // we only fake-idled because network was gone during IDLE, probably - break InterruptInfo::new(false); + if let Some(session) = &self.session { + if session.can_idle() { + // we only fake-idled because network was gone during IDLE, probably + break InterruptInfo::new(false); + } } info!(context, "fake_idle is connected"); // we are connected, let's see if fetching messages results @@ -177,7 +176,7 @@ impl Imap { } Err(err) => { error!(context, "could not fetch from folder: {:#}", err); - self.trigger_reconnect(context).await; + self.trigger_reconnect(context); } } } diff --git a/src/imap/scan_folders.rs b/src/imap/scan_folders.rs index 37699b3e4..d7c732244 100644 --- a/src/imap/scan_folders.rs +++ b/src/imap/scan_folders.rs @@ -63,16 +63,18 @@ impl Imap { // Don't scan folders that are watched anyway if !watched_folders.contains(&folder.name().to_string()) && !is_drafts { + let session = self.session.as_mut().context("no session")?; // Drain leftover unsolicited EXISTS messages - self.server_sent_unsolicited_exists(context)?; + session.server_sent_unsolicited_exists(context)?; loop { self.fetch_move_delete(context, folder.name(), is_spam_folder) .await .ok_or_log_msg(context, "Can't fetch new msgs in scanned folder"); + let session = self.session.as_mut().context("no session")?; // If the server sent an unsocicited EXISTS during the fetch, we need to fetch again - if !self.server_sent_unsolicited_exists(context)? { + if !session.server_sent_unsolicited_exists(context)? { break; } } diff --git a/src/imap/select_folder.rs b/src/imap/select_folder.rs index 1458f59cc..3f04f41a2 100644 --- a/src/imap/select_folder.rs +++ b/src/imap/select_folder.rs @@ -1,4 +1,4 @@ -use super::Imap; +use super::session::Session as ImapSession; use crate::context::Context; use anyhow::Context as _; @@ -7,9 +7,6 @@ type Result = std::result::Result; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("IMAP Could not obtain imap-session object.")] - NoSession, - #[error("IMAP Connection Lost or no connection established")] ConnectionLost, @@ -29,55 +26,38 @@ impl From for Error { } } -impl Imap { - /// Issues a CLOSE command to expunge selected folder. +impl ImapSession { + /// Issues a CLOSE command if selected folder needs expunge, + /// i.e. if Delta Chat marked a message there as deleted previously. /// /// CLOSE is considerably faster than an EXPUNGE, see /// - pub(super) async fn close_folder(&mut self, context: &Context) -> anyhow::Result<()> { - if let Some(ref folder) = self.config.selected_folder { - info!(context, "Expunge messages in \"{}\".", folder); + pub(super) async fn maybe_close_folder(&mut self, context: &Context) -> anyhow::Result<()> { + if let Some(folder) = &self.selected_folder { + if self.selected_folder_needs_expunge { + info!(context, "Expunge messages in \"{}\".", folder); - let session = self.session.as_mut().context("no session")?; - if let Err(err) = session.close().await.context("IMAP close/expunge failed") { - self.trigger_reconnect(context).await; - return Err(err); + self.close().await.context("IMAP close/expunge failed")?; + info!(context, "close/expunge succeeded"); + self.selected_folder = None; + self.selected_folder_needs_expunge = false; } - info!(context, "close/expunge succeeded"); - } - self.config.selected_folder = None; - self.config.selected_folder_needs_expunge = false; - - Ok(()) - } - - /// Issues a CLOSE command if selected folder needs expunge. - pub(crate) async fn maybe_close_folder(&mut self, context: &Context) -> anyhow::Result<()> { - if self.config.selected_folder_needs_expunge { - self.close_folder(context).await?; } Ok(()) } - /// select a folder, possibly update uid_validity and, if needed, - /// expunge the folder to remove delete-marked messages. + /// Selects a folder, possibly updating uid_validity and, if needed, + /// expunging the folder to remove delete-marked messages. /// Returns whether a new folder was selected. pub(super) async fn select_folder( &mut self, context: &Context, folder: Option<&str>, ) -> Result { - if self.session.is_none() { - self.config.selected_folder = None; - self.config.selected_folder_needs_expunge = false; - self.trigger_reconnect(context).await; - return Err(Error::NoSession); - } - // if there is a new folder and the new folder is equal to the selected one, there's nothing to do. // if there is _no_ new folder, we continue as we might want to expunge below. if let Some(folder) = folder { - if let Some(ref selected_folder) = self.config.selected_folder { + if let Some(selected_folder) = &self.selected_folder { if folder == selected_folder { return Ok(NewlySelected::No); } @@ -89,42 +69,30 @@ impl Imap { // select new folder if let Some(folder) = folder { - if let Some(ref mut session) = &mut self.session { - let res = if self.config.can_condstore { - session.select_condstore(folder).await - } else { - session.select(folder).await - }; - - // - // says that if the server reports select failure we are in - // authenticated (not-select) state. - - match res { - Ok(mailbox) => { - self.config.selected_folder = Some(folder.to_string()); - self.config.selected_mailbox = Some(mailbox); - Ok(NewlySelected::Yes) - } - Err(async_imap::error::Error::ConnectionLost) => { - self.trigger_reconnect(context).await; - self.config.selected_folder = None; - Err(Error::ConnectionLost) - } - Err(async_imap::error::Error::Validate(_)) => { - Err(Error::BadFolderName(folder.to_string())) - } - Err(async_imap::error::Error::No(response)) => { - Err(Error::NoFolder(folder.to_string(), response)) - } - Err(err) => { - self.config.selected_folder = None; - self.trigger_reconnect(context).await; - Err(Error::Other(err.to_string())) - } - } + let res = if self.can_condstore() { + self.select_condstore(folder).await } else { - Err(Error::NoSession) + self.select(folder).await + }; + + // + // says that if the server reports select failure we are in + // authenticated (not-select) state. + + match res { + Ok(mailbox) => { + self.selected_folder = Some(folder.to_string()); + self.selected_mailbox = Some(mailbox); + Ok(NewlySelected::Yes) + } + Err(async_imap::error::Error::ConnectionLost) => Err(Error::ConnectionLost), + Err(async_imap::error::Error::Validate(_)) => { + Err(Error::BadFolderName(folder.to_string())) + } + Err(async_imap::error::Error::No(response)) => { + Err(Error::NoFolder(folder.to_string(), response)) + } + Err(err) => Err(Error::Other(err.to_string())), } } else { Ok(NewlySelected::No) @@ -141,8 +109,7 @@ impl Imap { Ok(newly_selected) => Ok(newly_selected), Err(err) => match err { Error::NoFolder(..) => { - let session = self.session.as_mut().context("no IMAP session")?; - session.create(folder).await.with_context(|| { + self.create(folder).await.with_context(|| { format!("Couldn't select folder ('{}'), then create() failed", err) })?; @@ -153,6 +120,7 @@ impl Imap { } } } + #[derive(PartialEq, Debug, Copy, Clone, Eq)] pub(super) enum NewlySelected { /// The folder was newly selected during this call to select_folder(). diff --git a/src/imap/session.rs b/src/imap/session.rs index 50005a67b..6bc6ac200 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,13 +1,26 @@ use std::ops::{Deref, DerefMut}; +use async_imap::types::Mailbox; use async_imap::Session as ImapSession; use async_native_tls::TlsStream; use fast_socks5::client::Socks5Stream; use tokio::net::TcpStream; +use super::capabilities::Capabilities; + #[derive(Debug)] pub(crate) struct Session { pub(super) inner: ImapSession>, + + pub capabilities: Capabilities, + + /// Selected folder name. + pub selected_folder: Option, + + /// Mailbox structure returned by IMAP server. + pub selected_mailbox: Option, + + pub selected_folder_needs_expunge: bool, } pub(crate) trait SessionStream: @@ -35,8 +48,32 @@ impl DerefMut for Session { } impl Session { - pub fn idle(self) -> async_imap::extensions::idle::Handle> { - let Session { inner } = self; - inner.idle() + pub(crate) fn new( + inner: ImapSession>, + capabilities: Capabilities, + ) -> Self { + Self { + inner, + capabilities, + selected_folder: None, + selected_mailbox: None, + selected_folder_needs_expunge: false, + } + } + + pub fn can_idle(&self) -> bool { + self.capabilities.can_idle + } + + pub fn can_move(&self) -> bool { + self.capabilities.can_move + } + + pub fn can_check_quota(&self) -> bool { + self.capabilities.can_check_quota + } + + pub fn can_condstore(&self) -> bool { + self.capabilities.can_condstore } } diff --git a/src/quota.rs b/src/quota.rs index 7c74e63a0..1053cd81a 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -8,6 +8,7 @@ use crate::chat::add_device_msg_with_importance; use crate::config::Config; use crate::context::Context; use crate::imap::scan_folders::get_watched_folders; +use crate::imap::session::Session as ImapSession; use crate::imap::Imap; use crate::job::{Action, Status}; use crate::message::{Message, Viewtype}; @@ -49,12 +50,12 @@ pub struct QuotaInfo { } async fn get_unique_quota_roots_and_usage( + session: &mut ImapSession, folders: Vec, - imap: &mut Imap, ) -> Result>> { let mut unique_quota_roots: BTreeMap> = BTreeMap::new(); for folder in folders { - let (quota_roots, quotas) = &imap.get_quota_roots(&folder).await?; + let (quota_roots, quotas) = &session.get_quota_root(&folder).await?; // if there are new quota roots found in this imap folder, add them to the list for qr_entries in quota_roots { for quota_root_name in &qr_entries.quota_root_names { @@ -135,9 +136,10 @@ impl Context { return Ok(Status::RetryNow); } - let quota = if imap.can_check_quota() { + let session = imap.session.as_mut().context("no session")?; + let quota = if session.can_check_quota() { let folders = get_watched_folders(self).await?; - get_unique_quota_roots_and_usage(folders, imap).await + get_unique_quota_roots_and_usage(session, folders).await } else { Err(anyhow!(stock_str::not_supported_by_provider(self).await)) }; diff --git a/src/scheduler.rs b/src/scheduler.rs index 32f19cf70..81d5e7be8 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -166,6 +166,12 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne .await; } +/// Implement a single iteration of IMAP loop. +/// +/// This function performs all IMAP operations on a single folder, selecting it if necessary and +/// handling all the errors. In case of an error, it is logged, but not propagated upwards. If +/// critical operation fails such as fetching new messages fails, connection is reset via +/// `trigger_reconnect`, so a fresh one can be opened. async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) -> InterruptInfo { let folder = match ctx.get_config(folder_config).await { Ok(folder) => folder, @@ -187,18 +193,25 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) }; // connect and fake idle if unable to connect - if let Err(err) = connection.prepare(ctx).await { - warn!(ctx, "imap connection failed: {}", err); + if let Err(err) = connection + .prepare(ctx) + .await + .context("prepare IMAP connection") + { + connection.trigger_reconnect(ctx); + warn!(ctx, "{:#}", err); return connection.fake_idle(ctx, Some(watch_folder)).await; } if folder_config == Config::ConfiguredInboxFolder { - if let Err(err) = connection - .store_seen_flags_on_imap(ctx) - .await - .context("store_seen_flags_on_imap failed") - { - warn!(ctx, "{:#}", err); + if let Some(session) = connection.session.as_mut() { + session + .store_seen_flags_on_imap(ctx) + .await + .context("store_seen_flags_on_imap") + .ok_or_log(ctx); + } else { + warn!(ctx, "No session even though we just prepared it"); } } @@ -208,7 +221,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) .await .context("fetch_move_delete") { - connection.trigger_reconnect(ctx).await; + connection.trigger_reconnect(ctx); warn!(ctx, "{:#}", err); return InterruptInfo::new(false); } @@ -217,12 +230,10 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not // called right before `fetch_move_delete` because it is not well optimized and would // otherwise slow down message fetching. - if let Err(err) = delete_expired_imap_messages(ctx) + delete_expired_imap_messages(ctx) .await .context("delete_expired_imap_messages") - { - warn!(ctx, "{:#}", err); - } + .ok_or_log(ctx); // Scan additional folders only after finishing fetching the watched folder. // @@ -248,7 +259,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) .await .context("fetch_move_delete after scan_folders") { - connection.trigger_reconnect(ctx).await; + connection.trigger_reconnect(ctx); warn!(ctx, "{:#}", err); return InterruptInfo::new(false); } @@ -266,18 +277,38 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) connection.connectivity.set_connected(ctx).await; - // idle - if !connection.can_idle() { - return connection.fake_idle(ctx, Some(watch_folder)).await; - } - - match connection.idle(ctx, Some(watch_folder)).await { - Ok(v) => v, - Err(err) => { - connection.trigger_reconnect(ctx).await; - warn!(ctx, "{:#}", err); - InterruptInfo::new(false) + if let Some(session) = connection.session.take() { + if !session.can_idle() { + info!( + ctx, + "IMAP session does not support IDLE, going to fake idle." + ); + return connection.fake_idle(ctx, Some(watch_folder)).await; } + + info!(ctx, "IMAP session supports IDLE, using it."); + match session + .idle( + ctx, + connection.idle_interrupt_receiver.clone(), + Some(watch_folder), + ) + .await + .context("idle") + { + Ok((session, info)) => { + connection.session = Some(session); + info + } + Err(err) => { + connection.trigger_reconnect(ctx); + warn!(ctx, "{:#}", err); + InterruptInfo::new(false) + } + } + } else { + warn!(ctx, "No IMAP session, going to fake idle."); + connection.fake_idle(ctx, Some(watch_folder)).await } }