diff --git a/CHANGELOG.md b/CHANGELOG.md index f3d360afa..930e0634b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ - Log the reason when the message cannot be sent to the chat #3810 - Add IMAP server ID line to the context info only when it is known #3814 - Remove autogenerated typescript files #3815 +- Move functions that require an IMAP session from `Imap` to `Session` + to reduce the number of code paths where IMAP session may not exist. + Drop connection on error instead of trying to disconnect, + potentially preventing IMAP task from getting stuck. #3812 ### API-Changes - Add Python API to send reactions #3762 diff --git a/src/configure.rs b/src/configure.rs index 42d35cdc8..63a4dec1a 100644 --- a/src/configure.rs +++ b/src/configure.rs @@ -442,9 +442,6 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { let create_mvbox = ctx.should_watch_mvbox().await?; - // Send client ID as soon as possible before doing anything else. - imap.determine_capabilities(ctx).await?; - imap.configure_folders(ctx, create_mvbox).await?; imap.select_with_uidvalidity(ctx, "INBOX") diff --git a/src/imap.rs b/src/imap.rs index 139853b86..2f22fe4a0 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -13,9 +13,7 @@ use std::{ use anyhow::{bail, format_err, Context as _, Result}; use async_channel::Receiver; -use async_imap::types::{ - Fetch, Flag, Mailbox, Name, NameAttribute, Quota, QuotaRoot, UnsolicitedResponse, -}; +use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse}; use futures::StreamExt; use num_traits::FromPrimitive; @@ -45,11 +43,12 @@ use crate::sql; use crate::stock_str; use crate::tools::create_id; +pub(crate) mod capabilities; mod client; mod idle; pub mod scan_folders; pub mod select_folder; -mod session; +pub(crate) mod session; use client::Client; use mailparse::SingleInfo; @@ -90,16 +89,11 @@ const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])"; #[derive(Debug)] pub struct Imap { - idle_interrupt: Receiver, + pub(crate) idle_interrupt_receiver: Receiver, config: ImapConfig, - session: Option, - should_reconnect: bool, + pub(crate) session: Option, login_failed_once: bool, - /// True if CAPABILITY command was run successfully once and config.can_* contain correct - /// values. - capabilities_determined: bool, - pub(crate) connectivity: ConnectivityStore, } @@ -156,23 +150,6 @@ struct ImapConfig { pub lp: ServerLoginParam, pub socks5_config: Option, pub strict_tls: bool, - pub selected_folder: Option, - pub selected_mailbox: Option, - pub selected_folder_needs_expunge: bool, - - pub can_idle: bool, - - /// True if the server has MOVE capability as defined in - /// - pub can_move: bool, - - /// True if the server has QUOTA capability as defined in - /// - pub can_check_quota: bool, - - /// True if the server has CONDSTORE capability as defined in - /// - pub can_condstore: bool, } struct UidGrouper> { @@ -245,7 +222,7 @@ impl Imap { socks5_config: Option, addr: &str, provider_strict_tls: bool, - idle_interrupt: Receiver, + idle_interrupt_receiver: Receiver, ) -> Result { if lp.server.is_empty() || lp.user.is_empty() || lp.password.is_empty() { bail!("Incomplete IMAP connection parameters"); @@ -262,23 +239,14 @@ impl Imap { lp: lp.clone(), socks5_config, strict_tls, - selected_folder: None, - selected_mailbox: None, - selected_folder_needs_expunge: false, - can_idle: false, - can_move: false, - can_check_quota: false, - can_condstore: false, }; let imap = Imap { - idle_interrupt, + idle_interrupt_receiver, config, session: None, - should_reconnect: false, login_failed_once: false, connectivity: Default::default(), - capabilities_determined: false, }; Ok(imap) @@ -287,7 +255,7 @@ impl Imap { /// Creates new disconnected IMAP client using configured parameters. pub async fn new_configured( context: &Context, - idle_interrupt: Receiver, + idle_interrupt_receiver: Receiver, ) -> Result { if !context.is_configured().await? { bail!("IMAP Connect without configured params"); @@ -305,7 +273,7 @@ impl Imap { .map_or(param.socks5_config.is_some(), |provider| { provider.strict_tls }), - idle_interrupt, + idle_interrupt_receiver, )?; Ok(imap) } @@ -322,10 +290,7 @@ impl Imap { bail!("IMAP operation attempted while it is torn down"); } - if self.should_reconnect() { - self.disconnect(context).await; - self.should_reconnect = false; - } else if self.session.is_some() { + if self.session.is_some() { return Ok(()); } @@ -404,11 +369,12 @@ impl Imap { client.login(imap_user, imap_pw).await }; - self.should_reconnect = false; - match login_res { Ok(session) => { - // needs to be set here to ensure it is set on reconnects. + // Store server ID in the context to display in account info. + let mut lock = context.server_id.write().await; + *lock = session.capabilities.server_id.clone(); + self.session = Some(session); self.login_failed_once = false; context.emit_event(EventType::ImapConnected(format!( @@ -446,41 +412,11 @@ impl Imap { self.login_failed_once = true; } - self.trigger_reconnect(context).await; Err(format_err!("{}\n\n{}", message, err)) } } } - /// Determine server capabilities if not done yet. - /// - /// If server supports ID capability, send our client ID. - pub(crate) async fn determine_capabilities(&mut self, context: &Context) -> Result<()> { - if self.capabilities_determined { - return Ok(()); - } - - let session = self.session.as_mut().context( - "Can't determine server capabilities because connection was not established", - )?; - let caps = session - .capabilities() - .await - .context("CAPABILITY command error")?; - if caps.has_str("ID") { - let server_id = session.id([("name", Some("Delta Chat"))]).await?; - info!(context, "Server ID: {:?}", server_id); - let mut lock = context.server_id.write().await; - *lock = server_id; - } - self.config.can_idle = caps.has_str("IDLE"); - self.config.can_move = caps.has_str("MOVE"); - self.config.can_check_quota = caps.has_str("QUOTA"); - self.config.can_condstore = caps.has_str("CONDSTORE"); - self.capabilities_determined = true; - Ok(()) - } - /// Prepare for IMAP operation. /// /// Ensure that IMAP client is connected, folders are created and IMAP capabilities are @@ -491,37 +427,16 @@ impl Imap { return Err(err); } - self.determine_capabilities(context).await?; self.ensure_configured_folders(context, true).await?; Ok(()) } - async fn disconnect(&mut self, context: &Context) { - info!(context, "disconnecting"); - - // Close folder if messages should be expunged - if let Err(err) = self.close_folder(context).await { - warn!(context, "failed to close folder: {:?}", err); - } - - // Logout from the server - if let Some(mut session) = self.session.take() { - if let Err(err) = session.logout().await { - warn!(context, "failed to logout: {:?}", err); - } - } - self.capabilities_determined = false; - self.config.selected_folder = None; - self.config.selected_mailbox = None; - } - - pub fn should_reconnect(&self) -> bool { - self.should_reconnect - } - - pub async fn trigger_reconnect(&mut self, context: &Context) { - self.connectivity.set_connecting(context).await; - self.should_reconnect = true; + /// Drops the session without disconnecting properly. + /// Useful in case of an IMAP error, when it's unclear if it's in a correct state and it's + /// easier to setup a new connection. + pub fn trigger_reconnect(&mut self, context: &Context) { + info!(context, "Dropping an IMAP connection."); + self.session = None; } /// FETCH-MOVE-DELETE iteration. @@ -552,7 +467,12 @@ impl Imap { context.interrupt_ephemeral_task().await; } - self.move_delete_messages(context, watch_folder) + let session = self + .session + .as_mut() + .context("no IMAP connection established")?; + session + .move_delete_messages(context, watch_folder) .await .context("move_delete_messages")?; @@ -573,13 +493,13 @@ impl Imap { // Collect pairs of UID and Message-ID. let mut msg_ids = BTreeMap::new(); - self.select_folder(context, Some(&folder)).await?; - let session = self .session .as_mut() .context("IMAP No connection established")?; + session.select_folder(context, Some(&folder)).await?; + let mut list = session .uid_fetch("1:*", RFC724MID_UID) .await @@ -638,10 +558,9 @@ impl Imap { context: &Context, folder: &str, ) -> Result { - let newly_selected = self.select_or_create_folder(context, folder).await?; - - let mailbox = self - .config + let session = self.session.as_mut().context("no session")?; + let newly_selected = session.select_or_create_folder(context, folder).await?; + let mailbox = session .selected_mailbox .as_mut() .with_context(|| format!("No mailbox selected, folder: {}", folder))?; @@ -701,12 +620,12 @@ impl Imap { context, "IMAP folder has no uid_next, fall back to fetching" ); - let session = self.session.as_mut().context("Get uid_next: Nosession")?; // note that we use fetch by sequence number // and thus we only need to get exactly the // last-index message. let set = format!("{}", mailbox.exists); let mut list = session + .inner .fetch(set, JUST_UID) .await .context("Error fetching UID")?; @@ -947,7 +866,9 @@ impl Imap { .await?; Ok(()) } +} +impl Session { /// Deletes batch of messages identified by their UID from the currently /// selected folder. async fn delete_message_batch( @@ -975,7 +896,6 @@ impl Imap { "IMAP messages {} marked as deleted", uid_set ))); - self.config.selected_folder_needs_expunge = true; Ok(()) } @@ -988,12 +908,8 @@ impl Imap { row_ids: Vec, target: &str, ) -> Result<()> { - if self.config.can_move { - let session = self - .session - .as_mut() - .context("no session while attempting to MOVE messages")?; - match session.uid_mv(set, &target).await { + if self.can_move() { + match self.uid_mv(set, &target).await { Ok(()) => { // Messages are moved or don't exist, IMAP returns OK response in both cases. context @@ -1032,11 +948,7 @@ impl Imap { // Server does not support MOVE or MOVE failed. // Copy the message to the destination folder and mark the record for deletion. - let session = self - .session - .as_mut() - .context("no session while attempting to COPY messages")?; - match session.uid_copy(&set, &target).await { + match self.uid_copy(&set, &target).await { Ok(()) => { context .sql @@ -1081,8 +993,6 @@ impl Imap { ) .await?; - self.prepare(context).await?; - for (target, rowid_set, uid_set) in UidGrouper::from(rows) { // Select folder inside the loop to avoid selecting it if there are no pending // MOVE/DELETE operations. This does not result in multiple SELECT commands @@ -1118,8 +1028,6 @@ impl Imap { /// Stores pending `\Seen` flags for messages in `imap_markseen` table. pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> { - self.prepare(context).await?; - let rows = context .sql .query_map( @@ -1171,10 +1079,17 @@ impl Imap { Ok(()) } +} +impl Imap { /// Synchronizes `\Seen` flags using `CONDSTORE` extension. pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> { - if !self.config.can_condstore { + let session = self + .session + .as_mut() + .with_context(|| format!("No IMAP connection established, folder: {}", folder))?; + + if !session.can_condstore() { info!( context, "Server does not support CONDSTORE, skipping flag synchronization." @@ -1182,16 +1097,12 @@ impl Imap { return Ok(()); } - self.select_folder(context, Some(folder)) + session + .select_folder(context, Some(folder)) .await .context("failed to select folder")?; - let session = self - .session - .as_mut() - .with_context(|| format!("No IMAP connection established, folder: {}", folder))?; - let mailbox = self - .config + let mailbox = session .selected_mailbox .as_ref() .with_context(|| format!("No mailbox selected, folder: {}", folder))?; @@ -1334,15 +1245,11 @@ impl Imap { /// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages) async fn prefetch_existing_msgs(&mut self) -> Result> { + let session = self.session.as_mut().context("no IMAP session")?; let exists: i64 = { - let mailbox = self - .config - .selected_mailbox - .as_ref() - .context("no mailbox")?; + let mailbox = session.selected_mailbox.as_ref().context("no mailbox")?; mailbox.exists.into() }; - let session = self.session.as_mut().context("no IMAP session")?; // Fetch last DC_FETCH_EXISTING_MSGS_COUNT (100) messages. // Sequence numbers are sequential. If there are 1000 messages in the inbox, @@ -1394,7 +1301,7 @@ impl Imap { if fetch_partially { "partial" } else { "full" }, set ); - let mut fetch_responses = match session + let mut fetch_responses = session .uid_fetch( &set, if fetch_partially { @@ -1404,17 +1311,9 @@ impl Imap { }, ) .await - .with_context(|| format!("fetching messages {} from folder \"{}\"", &set, folder)) - { - Ok(fetch_responses) => fetch_responses, - Err(err) => { - // We want to reconnect regardless of whether it's an I/O error or parsing - // error. If the protocol parser ends up in incorrect state because of some - // incompatiblity with a server, reset may help. - self.should_reconnect = true; - return Err(err); - } - }; + .with_context(|| { + format!("fetching messages {} from folder \"{}\"", &set, folder) + })?; // Map from UIDs to unprocessed FETCH results. We put unprocessed FETCH results here // when we want to process other messages first. @@ -1563,20 +1462,20 @@ impl Imap { Ok((last_uid, received_msgs)) } +} +impl Session { /// Returns success if we successfully set the flag or we otherwise /// think add_flag should not be retried: Disconnection during setting /// the flag, or other imap-errors, returns true as well. /// /// Returning error means that the operation can be retried. async fn add_flag_finalized_with_set(&mut self, uid_set: &str, flag: &str) -> Result<()> { - if self.should_reconnect() { - bail!("Can't set flag, should reconnect"); + if flag == "\\Deleted" { + self.selected_folder_needs_expunge = true; } - - let session = self.session.as_mut().context("No session")?; let query = format!("+FLAGS ({})", flag); - let mut responses = session + let mut responses = self .uid_store(uid_set, &query) .await .with_context(|| format!("IMAP failed to store: ({}, {})", uid_set, query))?; @@ -1585,7 +1484,9 @@ impl Imap { } Ok(()) } +} +impl Imap { pub(crate) async fn prepare_imap_operation_on_msg( &mut self, context: &Context, @@ -1595,32 +1496,35 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::RetryLater); } - if self.session.is_none() { - // currently jobs are only performed on the INBOX thread - // TODO: make INBOX/SENT/MVBOX perform the jobs on their - // respective folders to avoid select_folder network traffic - // and the involved error states - if let Err(err) = self.prepare(context).await { - warn!(context, "prepare_imap_op failed: {}", err); - return Some(ImapActionResult::RetryLater); - } + if let Err(err) = self.prepare(context).await { + warn!(context, "prepare_imap_op failed: {}", err); + return Some(ImapActionResult::RetryLater); } - match self.select_folder(context, Some(folder)).await { + + let session = match self + .session + .as_mut() + .context("no IMAP connection established") + { + Err(err) => { + error!(context, "Failed to prepare IMAP operation: {:#}", err); + return Some(ImapActionResult::Failed); + } + Ok(session) => session, + }; + + match session.select_folder(context, Some(folder)).await { Ok(_) => None, Err(select_folder::Error::ConnectionLost) => { warn!(context, "Lost imap connection"); Some(ImapActionResult::RetryLater) } - Err(select_folder::Error::NoSession) => { - warn!(context, "no imap session"); - Some(ImapActionResult::Failed) - } Err(select_folder::Error::BadFolderName(folder_name)) => { warn!(context, "invalid folder name: {:?}", folder_name); Some(ImapActionResult::Failed) } Err(err) => { - warn!(context, "failed to select folder: {:?}: {:?}", folder, err); + warn!(context, "failed to select folder {:?}: {:#}", folder, err); Some(ImapActionResult::RetryLater) } } @@ -1653,15 +1557,15 @@ impl Imap { folders: &[&'a str], create_mvbox: bool, ) -> Result> { - // Close currently selected folder if needed. - // We are going to select folders using low-level EXAMINE operations below. - self.select_folder(context, None).await?; - let session = self .session .as_mut() .context("no IMAP connection established")?; + // Close currently selected folder if needed. + // We are going to select folders using low-level EXAMINE operations below. + session.select_folder(context, None).await?; + for folder in folders { info!(context, "Looking for MVBOX-folder \"{}\"...", &folder); let res = session.examine(&folder).await; @@ -1760,15 +1664,16 @@ impl Imap { info!(context, "FINISHED configuring IMAP-folders."); Ok(()) } +} +impl Session { /// Return whether the server sent an unsolicited EXISTS response. /// Drains all responses from `session.unsolicited_responses` in the process. /// If this returns `true`, this means that new emails arrived and you should /// fetch again, even if you just fetched. fn server_sent_unsolicited_exists(&self, context: &Context) -> Result { - let session = self.session.as_ref().context("no session")?; let mut unsolicited_exists = false; - while let Ok(response) = session.unsolicited_responses.try_recv() { + while let Ok(response) = self.unsolicited_responses.try_recv() { match response { UnsolicitedResponse::Exists(_) => { info!( @@ -1782,19 +1687,6 @@ impl Imap { } Ok(unsolicited_exists) } - - pub fn can_check_quota(&self) -> bool { - self.config.can_check_quota - } - - pub(crate) async fn get_quota_roots( - &mut self, - mailbox_name: &str, - ) -> Result<(Vec, Vec)> { - let session = self.session.as_mut().context("no session")?; - let quota_roots = session.get_quota_root(mailbox_name).await?; - Ok(quota_roots) - } } async fn should_move_out_of_spam( diff --git a/src/imap/capabilities.rs b/src/imap/capabilities.rs new file mode 100644 index 000000000..14385c088 --- /dev/null +++ b/src/imap/capabilities.rs @@ -0,0 +1,26 @@ +//! # IMAP capabilities +//! +//! IMAP server capabilities are determined with a `CAPABILITY` command. +use std::collections::HashMap; + +#[derive(Debug)] +pub(crate) struct Capabilities { + /// True if the server has IDLE capability as defined in + /// + pub can_idle: bool, + + /// True if the server has MOVE capability as defined in + /// + pub can_move: bool, + + /// True if the server has QUOTA capability as defined in + /// + pub can_check_quota: bool, + + /// True if the server has CONDSTORE capability as defined in + /// + pub can_condstore: bool, + + /// Server ID if the server supports ID capability. + pub server_id: Option>, +} diff --git a/src/imap/client.rs b/src/imap/client.rs index 0e405a60d..807ebf3ff 100644 --- a/src/imap/client.rs +++ b/src/imap/client.rs @@ -6,10 +6,12 @@ use std::{ use anyhow::{Context as _, Result}; use async_imap::Client as ImapClient; +use async_imap::Session as ImapSession; use async_smtp::ServerAddress; use tokio::net::{self, TcpStream}; +use super::capabilities::Capabilities; use super::session::Session; use crate::login_param::{build_tls, Socks5Config}; @@ -38,27 +40,54 @@ impl DerefMut for Client { } } +/// Determine server capabilities. +/// +/// If server supports ID capability, send our client ID. +async fn determine_capabilities( + session: &mut ImapSession>, +) -> Result { + let caps = session + .capabilities() + .await + .context("CAPABILITY command error")?; + let server_id = if caps.has_str("ID") { + session.id([("name", Some("Delta Chat"))]).await? + } else { + None + }; + let capabilities = Capabilities { + can_idle: caps.has_str("IDLE"), + can_move: caps.has_str("MOVE"), + can_check_quota: caps.has_str("QUOTA"), + can_condstore: caps.has_str("CONDSTORE"), + server_id, + }; + Ok(capabilities) +} + impl Client { - pub async fn login(self, username: &str, password: &str) -> Result { + pub(crate) async fn login(self, username: &str, password: &str) -> Result { let Client { inner, .. } = self; - let session = inner + let mut session = inner .login(username, password) .await .map_err(|(err, _client)| err)?; - Ok(Session { inner: session }) + let capabilities = determine_capabilities(&mut session).await?; + Ok(Session::new(session, capabilities)) } - pub async fn authenticate( + pub(crate) async fn authenticate( self, auth_type: &str, authenticator: impl async_imap::Authenticator, ) -> Result { let Client { inner, .. } = self; - let session = inner + let mut session = inner .authenticate(auth_type, authenticator) .await .map_err(|(err, _client)| err)?; - Ok(Session { inner: session }) + let capabilities = determine_capabilities(&mut session).await?; + Ok(Session::new(session, capabilities)) } pub async fn connect_secure( @@ -146,7 +175,7 @@ impl Client { }) } - pub async fn secure(self, domain: &str, strict_tls: bool) -> Result { + pub async fn secure(self, domain: &str, strict_tls: bool) -> Result { if self.is_secure { Ok(self) } else { diff --git a/src/imap/idle.rs b/src/imap/idle.rs index f6af1cded..61821a82e 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -1,113 +1,106 @@ use super::Imap; use anyhow::{bail, Context as _, Result}; +use async_channel::Receiver; use async_imap::extensions::idle::IdleResponse; use futures_lite::FutureExt; use std::time::{Duration, SystemTime}; +use super::session::Session; use crate::{context::Context, scheduler::InterruptInfo}; -use super::session::Session; - -impl Imap { - pub fn can_idle(&self) -> bool { - self.config.can_idle - } - +impl Session { pub async fn idle( - &mut self, + mut self, context: &Context, + idle_interrupt_receiver: Receiver, watch_folder: Option, - ) -> Result { + ) -> Result<(Self, InterruptInfo)> { use futures::future::FutureExt; if !self.can_idle() { bail!("IMAP server does not have IDLE capability"); } - self.prepare(context).await?; - - self.select_folder(context, watch_folder.as_deref()).await?; let timeout = Duration::from_secs(23 * 60); let mut info = Default::default(); + self.select_folder(context, watch_folder.as_deref()).await?; + if self.server_sent_unsolicited_exists(context)? { - return Ok(info); + return Ok((self, info)); } - if let Some(session) = self.session.take() { - if let Ok(info) = self.idle_interrupt.try_recv() { - info!(context, "skip idle, got interrupt {:?}", info); - self.session = Some(session); - return Ok(info); - } - - let mut handle = session.idle(); - if let Err(err) = handle.init().await { - bail!("IMAP IDLE protocol failed to init/complete: {}", err); - } - - let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - - enum Event { - IdleResponse(IdleResponse), - Interrupt(InterruptInfo), - } - - let folder_name = watch_folder.as_deref().unwrap_or("None"); - info!( - context, - "{}: Idle entering wait-on-remote state", folder_name - ); - let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async { - let info = self.idle_interrupt.recv().await; - - // cancel imap idle connection properly - drop(interrupt); - - Ok(Event::Interrupt(info.unwrap_or_default())) - }); - - match fut.await { - Ok(Event::IdleResponse(IdleResponse::NewData(x))) => { - info!(context, "{}: Idle has NewData {:?}", folder_name, x); - } - Ok(Event::IdleResponse(IdleResponse::Timeout)) => { - info!( - context, - "{}: Idle-wait timeout or interruption", folder_name - ); - } - Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { - info!( - context, - "{}: Idle wait was interrupted manually", folder_name - ); - } - Ok(Event::Interrupt(i)) => { - info!( - context, - "{}: Idle wait was interrupted: {:?}", folder_name, &i - ); - info = i; - } - Err(err) => { - warn!(context, "{}: Idle wait errored: {:?}", folder_name, err); - } - } - - let session = tokio::time::timeout(Duration::from_secs(15), handle.done()) - .await - .with_context(|| format!("{}: IMAP IDLE protocol timed out", folder_name))? - .with_context(|| format!("{}: IMAP IDLE failed", folder_name))?; - self.session = Some(Session { inner: session }); - } else { - warn!(context, "Attempted to idle without a session"); + if let Ok(info) = idle_interrupt_receiver.try_recv() { + info!(context, "skip idle, got interrupt {:?}", info); + return Ok((self, info)); } - Ok(info) + let mut handle = self.inner.idle(); + if let Err(err) = handle.init().await { + bail!("IMAP IDLE protocol failed to init/complete: {}", err); + } + + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + + enum Event { + IdleResponse(IdleResponse), + Interrupt(InterruptInfo), + } + + let folder_name = watch_folder.as_deref().unwrap_or("None"); + info!( + context, + "{}: Idle entering wait-on-remote state", folder_name + ); + let fut = idle_wait.map(|ev| ev.map(Event::IdleResponse)).race(async { + let info = idle_interrupt_receiver.recv().await; + + // cancel imap idle connection properly + drop(interrupt); + + Ok(Event::Interrupt(info.unwrap_or_default())) + }); + + match fut.await { + Ok(Event::IdleResponse(IdleResponse::NewData(x))) => { + info!(context, "{}: Idle has NewData {:?}", folder_name, x); + } + Ok(Event::IdleResponse(IdleResponse::Timeout)) => { + info!( + context, + "{}: Idle-wait timeout or interruption", folder_name + ); + } + Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { + info!( + context, + "{}: Idle wait was interrupted manually", folder_name + ); + } + Ok(Event::Interrupt(i)) => { + info!( + context, + "{}: Idle wait was interrupted: {:?}", folder_name, &i + ); + info = i; + } + Err(err) => { + warn!(context, "{}: Idle wait errored: {:?}", folder_name, err); + } + } + + let session = tokio::time::timeout(Duration::from_secs(15), handle.done()) + .await + .with_context(|| format!("{}: IMAP IDLE protocol timed out", folder_name))? + .with_context(|| format!("{}: IMAP IDLE failed", folder_name))?; + self.inner = session; + + Ok((self, info)) } +} +impl Imap { pub(crate) async fn fake_idle( &mut self, context: &Context, @@ -123,7 +116,11 @@ impl Imap { watch_folder } else { info!(context, "IMAP-fake-IDLE: no folder, waiting for interrupt"); - return self.idle_interrupt.recv().await.unwrap_or_default(); + return self + .idle_interrupt_receiver + .recv() + .await + .unwrap_or_default(); }; info!(context, "IMAP-fake-IDLEing folder={:?}", watch_folder); @@ -142,7 +139,7 @@ impl Imap { .tick() .map(|_| Event::Tick) .race( - self.idle_interrupt + self.idle_interrupt_receiver .recv() .map(|probe_network| Event::Interrupt(probe_network.unwrap_or_default())), ) @@ -156,9 +153,11 @@ impl Imap { warn!(context, "fake_idle: could not connect: {}", err); continue; } - if self.config.can_idle { - // we only fake-idled because network was gone during IDLE, probably - break InterruptInfo::new(false); + if let Some(session) = &self.session { + if session.can_idle() { + // we only fake-idled because network was gone during IDLE, probably + break InterruptInfo::new(false); + } } info!(context, "fake_idle is connected"); // we are connected, let's see if fetching messages results @@ -177,7 +176,7 @@ impl Imap { } Err(err) => { error!(context, "could not fetch from folder: {:#}", err); - self.trigger_reconnect(context).await; + self.trigger_reconnect(context); } } } diff --git a/src/imap/scan_folders.rs b/src/imap/scan_folders.rs index 37699b3e4..d7c732244 100644 --- a/src/imap/scan_folders.rs +++ b/src/imap/scan_folders.rs @@ -63,16 +63,18 @@ impl Imap { // Don't scan folders that are watched anyway if !watched_folders.contains(&folder.name().to_string()) && !is_drafts { + let session = self.session.as_mut().context("no session")?; // Drain leftover unsolicited EXISTS messages - self.server_sent_unsolicited_exists(context)?; + session.server_sent_unsolicited_exists(context)?; loop { self.fetch_move_delete(context, folder.name(), is_spam_folder) .await .ok_or_log_msg(context, "Can't fetch new msgs in scanned folder"); + let session = self.session.as_mut().context("no session")?; // If the server sent an unsocicited EXISTS during the fetch, we need to fetch again - if !self.server_sent_unsolicited_exists(context)? { + if !session.server_sent_unsolicited_exists(context)? { break; } } diff --git a/src/imap/select_folder.rs b/src/imap/select_folder.rs index 1458f59cc..3f04f41a2 100644 --- a/src/imap/select_folder.rs +++ b/src/imap/select_folder.rs @@ -1,4 +1,4 @@ -use super::Imap; +use super::session::Session as ImapSession; use crate::context::Context; use anyhow::Context as _; @@ -7,9 +7,6 @@ type Result = std::result::Result; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("IMAP Could not obtain imap-session object.")] - NoSession, - #[error("IMAP Connection Lost or no connection established")] ConnectionLost, @@ -29,55 +26,38 @@ impl From for Error { } } -impl Imap { - /// Issues a CLOSE command to expunge selected folder. +impl ImapSession { + /// Issues a CLOSE command if selected folder needs expunge, + /// i.e. if Delta Chat marked a message there as deleted previously. /// /// CLOSE is considerably faster than an EXPUNGE, see /// - pub(super) async fn close_folder(&mut self, context: &Context) -> anyhow::Result<()> { - if let Some(ref folder) = self.config.selected_folder { - info!(context, "Expunge messages in \"{}\".", folder); + pub(super) async fn maybe_close_folder(&mut self, context: &Context) -> anyhow::Result<()> { + if let Some(folder) = &self.selected_folder { + if self.selected_folder_needs_expunge { + info!(context, "Expunge messages in \"{}\".", folder); - let session = self.session.as_mut().context("no session")?; - if let Err(err) = session.close().await.context("IMAP close/expunge failed") { - self.trigger_reconnect(context).await; - return Err(err); + self.close().await.context("IMAP close/expunge failed")?; + info!(context, "close/expunge succeeded"); + self.selected_folder = None; + self.selected_folder_needs_expunge = false; } - info!(context, "close/expunge succeeded"); - } - self.config.selected_folder = None; - self.config.selected_folder_needs_expunge = false; - - Ok(()) - } - - /// Issues a CLOSE command if selected folder needs expunge. - pub(crate) async fn maybe_close_folder(&mut self, context: &Context) -> anyhow::Result<()> { - if self.config.selected_folder_needs_expunge { - self.close_folder(context).await?; } Ok(()) } - /// select a folder, possibly update uid_validity and, if needed, - /// expunge the folder to remove delete-marked messages. + /// Selects a folder, possibly updating uid_validity and, if needed, + /// expunging the folder to remove delete-marked messages. /// Returns whether a new folder was selected. pub(super) async fn select_folder( &mut self, context: &Context, folder: Option<&str>, ) -> Result { - if self.session.is_none() { - self.config.selected_folder = None; - self.config.selected_folder_needs_expunge = false; - self.trigger_reconnect(context).await; - return Err(Error::NoSession); - } - // if there is a new folder and the new folder is equal to the selected one, there's nothing to do. // if there is _no_ new folder, we continue as we might want to expunge below. if let Some(folder) = folder { - if let Some(ref selected_folder) = self.config.selected_folder { + if let Some(selected_folder) = &self.selected_folder { if folder == selected_folder { return Ok(NewlySelected::No); } @@ -89,42 +69,30 @@ impl Imap { // select new folder if let Some(folder) = folder { - if let Some(ref mut session) = &mut self.session { - let res = if self.config.can_condstore { - session.select_condstore(folder).await - } else { - session.select(folder).await - }; - - // - // says that if the server reports select failure we are in - // authenticated (not-select) state. - - match res { - Ok(mailbox) => { - self.config.selected_folder = Some(folder.to_string()); - self.config.selected_mailbox = Some(mailbox); - Ok(NewlySelected::Yes) - } - Err(async_imap::error::Error::ConnectionLost) => { - self.trigger_reconnect(context).await; - self.config.selected_folder = None; - Err(Error::ConnectionLost) - } - Err(async_imap::error::Error::Validate(_)) => { - Err(Error::BadFolderName(folder.to_string())) - } - Err(async_imap::error::Error::No(response)) => { - Err(Error::NoFolder(folder.to_string(), response)) - } - Err(err) => { - self.config.selected_folder = None; - self.trigger_reconnect(context).await; - Err(Error::Other(err.to_string())) - } - } + let res = if self.can_condstore() { + self.select_condstore(folder).await } else { - Err(Error::NoSession) + self.select(folder).await + }; + + // + // says that if the server reports select failure we are in + // authenticated (not-select) state. + + match res { + Ok(mailbox) => { + self.selected_folder = Some(folder.to_string()); + self.selected_mailbox = Some(mailbox); + Ok(NewlySelected::Yes) + } + Err(async_imap::error::Error::ConnectionLost) => Err(Error::ConnectionLost), + Err(async_imap::error::Error::Validate(_)) => { + Err(Error::BadFolderName(folder.to_string())) + } + Err(async_imap::error::Error::No(response)) => { + Err(Error::NoFolder(folder.to_string(), response)) + } + Err(err) => Err(Error::Other(err.to_string())), } } else { Ok(NewlySelected::No) @@ -141,8 +109,7 @@ impl Imap { Ok(newly_selected) => Ok(newly_selected), Err(err) => match err { Error::NoFolder(..) => { - let session = self.session.as_mut().context("no IMAP session")?; - session.create(folder).await.with_context(|| { + self.create(folder).await.with_context(|| { format!("Couldn't select folder ('{}'), then create() failed", err) })?; @@ -153,6 +120,7 @@ impl Imap { } } } + #[derive(PartialEq, Debug, Copy, Clone, Eq)] pub(super) enum NewlySelected { /// The folder was newly selected during this call to select_folder(). diff --git a/src/imap/session.rs b/src/imap/session.rs index 50005a67b..6bc6ac200 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,13 +1,26 @@ use std::ops::{Deref, DerefMut}; +use async_imap::types::Mailbox; use async_imap::Session as ImapSession; use async_native_tls::TlsStream; use fast_socks5::client::Socks5Stream; use tokio::net::TcpStream; +use super::capabilities::Capabilities; + #[derive(Debug)] pub(crate) struct Session { pub(super) inner: ImapSession>, + + pub capabilities: Capabilities, + + /// Selected folder name. + pub selected_folder: Option, + + /// Mailbox structure returned by IMAP server. + pub selected_mailbox: Option, + + pub selected_folder_needs_expunge: bool, } pub(crate) trait SessionStream: @@ -35,8 +48,32 @@ impl DerefMut for Session { } impl Session { - pub fn idle(self) -> async_imap::extensions::idle::Handle> { - let Session { inner } = self; - inner.idle() + pub(crate) fn new( + inner: ImapSession>, + capabilities: Capabilities, + ) -> Self { + Self { + inner, + capabilities, + selected_folder: None, + selected_mailbox: None, + selected_folder_needs_expunge: false, + } + } + + pub fn can_idle(&self) -> bool { + self.capabilities.can_idle + } + + pub fn can_move(&self) -> bool { + self.capabilities.can_move + } + + pub fn can_check_quota(&self) -> bool { + self.capabilities.can_check_quota + } + + pub fn can_condstore(&self) -> bool { + self.capabilities.can_condstore } } diff --git a/src/quota.rs b/src/quota.rs index 7c74e63a0..1053cd81a 100644 --- a/src/quota.rs +++ b/src/quota.rs @@ -8,6 +8,7 @@ use crate::chat::add_device_msg_with_importance; use crate::config::Config; use crate::context::Context; use crate::imap::scan_folders::get_watched_folders; +use crate::imap::session::Session as ImapSession; use crate::imap::Imap; use crate::job::{Action, Status}; use crate::message::{Message, Viewtype}; @@ -49,12 +50,12 @@ pub struct QuotaInfo { } async fn get_unique_quota_roots_and_usage( + session: &mut ImapSession, folders: Vec, - imap: &mut Imap, ) -> Result>> { let mut unique_quota_roots: BTreeMap> = BTreeMap::new(); for folder in folders { - let (quota_roots, quotas) = &imap.get_quota_roots(&folder).await?; + let (quota_roots, quotas) = &session.get_quota_root(&folder).await?; // if there are new quota roots found in this imap folder, add them to the list for qr_entries in quota_roots { for quota_root_name in &qr_entries.quota_root_names { @@ -135,9 +136,10 @@ impl Context { return Ok(Status::RetryNow); } - let quota = if imap.can_check_quota() { + let session = imap.session.as_mut().context("no session")?; + let quota = if session.can_check_quota() { let folders = get_watched_folders(self).await?; - get_unique_quota_roots_and_usage(folders, imap).await + get_unique_quota_roots_and_usage(session, folders).await } else { Err(anyhow!(stock_str::not_supported_by_provider(self).await)) }; diff --git a/src/scheduler.rs b/src/scheduler.rs index 32f19cf70..81d5e7be8 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -166,6 +166,12 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne .await; } +/// Implement a single iteration of IMAP loop. +/// +/// This function performs all IMAP operations on a single folder, selecting it if necessary and +/// handling all the errors. In case of an error, it is logged, but not propagated upwards. If +/// critical operation fails such as fetching new messages fails, connection is reset via +/// `trigger_reconnect`, so a fresh one can be opened. async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) -> InterruptInfo { let folder = match ctx.get_config(folder_config).await { Ok(folder) => folder, @@ -187,18 +193,25 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) }; // connect and fake idle if unable to connect - if let Err(err) = connection.prepare(ctx).await { - warn!(ctx, "imap connection failed: {}", err); + if let Err(err) = connection + .prepare(ctx) + .await + .context("prepare IMAP connection") + { + connection.trigger_reconnect(ctx); + warn!(ctx, "{:#}", err); return connection.fake_idle(ctx, Some(watch_folder)).await; } if folder_config == Config::ConfiguredInboxFolder { - if let Err(err) = connection - .store_seen_flags_on_imap(ctx) - .await - .context("store_seen_flags_on_imap failed") - { - warn!(ctx, "{:#}", err); + if let Some(session) = connection.session.as_mut() { + session + .store_seen_flags_on_imap(ctx) + .await + .context("store_seen_flags_on_imap") + .ok_or_log(ctx); + } else { + warn!(ctx, "No session even though we just prepared it"); } } @@ -208,7 +221,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) .await .context("fetch_move_delete") { - connection.trigger_reconnect(ctx).await; + connection.trigger_reconnect(ctx); warn!(ctx, "{:#}", err); return InterruptInfo::new(false); } @@ -217,12 +230,10 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) // on the next iteration of `fetch_move_delete`. `delete_expired_imap_messages` is not // called right before `fetch_move_delete` because it is not well optimized and would // otherwise slow down message fetching. - if let Err(err) = delete_expired_imap_messages(ctx) + delete_expired_imap_messages(ctx) .await .context("delete_expired_imap_messages") - { - warn!(ctx, "{:#}", err); - } + .ok_or_log(ctx); // Scan additional folders only after finishing fetching the watched folder. // @@ -248,7 +259,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) .await .context("fetch_move_delete after scan_folders") { - connection.trigger_reconnect(ctx).await; + connection.trigger_reconnect(ctx); warn!(ctx, "{:#}", err); return InterruptInfo::new(false); } @@ -266,18 +277,38 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder_config: Config) connection.connectivity.set_connected(ctx).await; - // idle - if !connection.can_idle() { - return connection.fake_idle(ctx, Some(watch_folder)).await; - } - - match connection.idle(ctx, Some(watch_folder)).await { - Ok(v) => v, - Err(err) => { - connection.trigger_reconnect(ctx).await; - warn!(ctx, "{:#}", err); - InterruptInfo::new(false) + if let Some(session) = connection.session.take() { + if !session.can_idle() { + info!( + ctx, + "IMAP session does not support IDLE, going to fake idle." + ); + return connection.fake_idle(ctx, Some(watch_folder)).await; } + + info!(ctx, "IMAP session supports IDLE, using it."); + match session + .idle( + ctx, + connection.idle_interrupt_receiver.clone(), + Some(watch_folder), + ) + .await + .context("idle") + { + Ok((session, info)) => { + connection.session = Some(session); + info + } + Err(err) => { + connection.trigger_reconnect(ctx); + warn!(ctx, "{:#}", err); + InterruptInfo::new(false) + } + } + } else { + warn!(ctx, "No IMAP session, going to fake idle."); + connection.fake_idle(ctx, Some(watch_folder)).await } }