From 351383dfa4c5505eb28fe0e46ee3ab94e41fa902 Mon Sep 17 00:00:00 2001 From: Alexander Krotov Date: Fri, 6 Dec 2019 03:07:08 +0300 Subject: [PATCH] Split IMAP idle functions into imap::idle submodule Also introduce Imap.can_idle() to avoid having to match on IdleAbilityMissing error --- src/imap/idle.rs | 250 ++++++++++++++++++++++++++++++++++++++++++++++ src/imap/mod.rs | 218 +--------------------------------------- src/job_thread.rs | 19 ++-- 3 files changed, 262 insertions(+), 225 deletions(-) create mode 100644 src/imap/idle.rs diff --git a/src/imap/idle.rs b/src/imap/idle.rs new file mode 100644 index 000000000..6f01bf8b8 --- /dev/null +++ b/src/imap/idle.rs @@ -0,0 +1,250 @@ +use super::Imap; + +use async_imap::extensions::idle::IdleResponse; +use async_std::prelude::*; +use async_std::task; +use std::sync::atomic::Ordering; +use std::time::{Duration, SystemTime}; + +use crate::context::Context; +use crate::imap_client::*; + +use super::select_folder; + +type Result = std::result::Result; + +#[derive(Debug, Fail)] +pub enum Error { + #[fail(display = "IMAP IDLE protocol failed to init/complete")] + IdleProtocolFailed(#[cause] async_imap::error::Error), + + #[fail(display = "IMAP server does not have IDLE capability")] + IdleAbilityMissing, + + #[fail(display = "IMAP select folder error")] + SelectFolderError(#[cause] select_folder::Error), + + #[fail(display = "IMAP error")] + ImapError(#[cause] async_imap::error::Error), + + #[fail(display = "Setup handle error")] + SetupHandleError(#[cause] super::Error), +} + +impl From for Error { + fn from(err: select_folder::Error) -> Error { + Error::SelectFolderError(err) + } +} + +impl Imap { + pub fn can_idle(&self) -> bool { + task::block_on(async move { self.config.read().await.can_idle }) + } + + pub fn idle(&self, context: &Context, watch_folder: Option) -> Result<()> { + task::block_on(async move { + if !self.can_idle() { + return Err(Error::IdleAbilityMissing); + } + + self.setup_handle_if_needed(context) + .await + .map_err(Error::SetupHandleError)?; + + self.select_folder(context, watch_folder.clone()).await?; + + let session = self.session.lock().await.take(); + let timeout = Duration::from_secs(23 * 60); + if let Some(session) = session { + match session.idle() { + // BEWARE: If you change the Secure branch you + // typically also need to change the Insecure branch. + IdleHandle::Secure(mut handle) => { + if let Err(err) = handle.init().await { + return Err(Error::IdleProtocolFailed(err)); + } + + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + *self.interrupt.lock().await = Some(interrupt); + + if self.skip_next_idle_wait.load(Ordering::SeqCst) { + // interrupt_idle has happened before we + // provided self.interrupt + self.skip_next_idle_wait.store(false, Ordering::SeqCst); + std::mem::drop(idle_wait); + info!(context, "Idle wait was skipped"); + } else { + info!(context, "Idle entering wait-on-remote state"); + match idle_wait.await { + IdleResponse::NewData(_) => { + info!(context, "Idle has NewData"); + } + // TODO: idle_wait does not distinguish manual interrupts + // from Timeouts if we would know it's a Timeout we could bail + // directly and reconnect . + IdleResponse::Timeout => { + info!(context, "Idle-wait timeout or interruption"); + } + IdleResponse::ManualInterrupt => { + info!(context, "Idle wait was interrupted"); + } + } + } + match handle.done().await { + Ok(session) => { + *self.session.lock().await = Some(Session::Secure(session)); + } + Err(err) => { + // if we cannot terminate IDLE it probably + // means that we waited long (with idle_wait) + // but the network went away/changed + self.trigger_reconnect(); + return Err(Error::IdleProtocolFailed(err)); + } + } + } + IdleHandle::Insecure(mut handle) => { + if let Err(err) = handle.init().await { + return Err(Error::IdleProtocolFailed(err)); + } + + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); + *self.interrupt.lock().await = Some(interrupt); + + if self.skip_next_idle_wait.load(Ordering::SeqCst) { + // interrupt_idle has happened before we + // provided self.interrupt + self.skip_next_idle_wait.store(false, Ordering::SeqCst); + std::mem::drop(idle_wait); + info!(context, "Idle wait was skipped"); + } else { + info!(context, "Idle entering wait-on-remote state"); + match idle_wait.await { + IdleResponse::NewData(_) => { + info!(context, "Idle has NewData"); + } + // TODO: idle_wait does not distinguish manual interrupts + // from Timeouts if we would know it's a Timeout we could bail + // directly and reconnect . + IdleResponse::Timeout => { + info!(context, "Idle-wait timeout or interruption"); + } + IdleResponse::ManualInterrupt => { + info!(context, "Idle wait was interrupted"); + } + } + } + match handle.done().await { + Ok(session) => { + *self.session.lock().await = Some(Session::Insecure(session)); + } + Err(err) => { + // if we cannot terminate IDLE it probably + // means that we waited long (with idle_wait) + // but the network went away/changed + self.trigger_reconnect(); + return Err(Error::IdleProtocolFailed(err)); + } + } + } + } + } + + Ok(()) + }) + } + + pub(crate) fn fake_idle(&self, context: &Context, watch_folder: Option) { + // Idle using polling. This is also needed if we're not yet configured - + // in this case, we're waiting for a configure job (and an interrupt). + task::block_on(async move { + let fake_idle_start_time = SystemTime::now(); + + info!(context, "IMAP-fake-IDLEing..."); + + let interrupt = stop_token::StopSource::new(); + + // check every minute if there are new messages + // TODO: grow sleep durations / make them more flexible + let interval = async_std::stream::interval(Duration::from_secs(60)); + let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); + *self.interrupt.lock().await = Some(interrupt); + if self.skip_next_idle_wait.load(Ordering::SeqCst) { + // interrupt_idle has happened before we + // provided self.interrupt + self.skip_next_idle_wait.store(false, Ordering::SeqCst); + info!(context, "fake-idle wait was skipped"); + } else { + // loop until we are interrupted or if we fetched something + while let Some(_) = interrupt_interval.next().await { + // try to connect with proper login params + // (setup_handle_if_needed might not know about them if we + // never successfully connected) + if let Err(err) = self.connect_configured(context) { + warn!(context, "fake_idle: could not connect: {}", err); + continue; + } + if self.config.read().await.can_idle { + // we only fake-idled because network was gone during IDLE, probably + break; + } + info!(context, "fake_idle is connected"); + // we are connected, let's see if fetching messages results + // in anything. If so, we behave as if IDLE had data but + // will have already fetched the messages so perform_*_fetch + // will not find any new. + + if let Some(ref watch_folder) = watch_folder { + match self.fetch_new_messages(context, watch_folder).await { + Ok(res) => { + info!(context, "fetch_new_messages returned {:?}", res); + if res { + break; + } + } + Err(err) => { + error!(context, "could not fetch from folder: {}", err); + self.trigger_reconnect() + } + } + } + } + } + self.interrupt.lock().await.take(); + + info!( + context, + "IMAP-fake-IDLE done after {:.4}s", + SystemTime::now() + .duration_since(fake_idle_start_time) + .unwrap() + .as_millis() as f64 + / 1000., + ); + }) + } + + pub fn interrupt_idle(&self, context: &Context) { + task::block_on(async move { + let mut interrupt: Option = self.interrupt.lock().await.take(); + if interrupt.is_none() { + // idle wait is not running, signal it needs to skip + self.skip_next_idle_wait.store(true, Ordering::SeqCst); + + // meanwhile idle-wait may have produced the StopSource + interrupt = self.interrupt.lock().await.take(); + } + // let's manually drop the StopSource + if interrupt.is_some() { + // the imap thread provided us a stop token but might + // not have entered idle_wait yet, give it some time + // for that to happen. XXX handle this without extra wait + // https://github.com/deltachat/deltachat-core-rust/issues/925 + std::thread::sleep(Duration::from_millis(200)); + info!(context, "low-level: dropping stop-source to interrupt idle"); + std::mem::drop(interrupt) + } + }); + } +} diff --git a/src/imap/mod.rs b/src/imap/mod.rs index f86b3609d..5fdf90676 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -4,14 +4,11 @@ //! to implement connect, fetch, delete functionality with standard IMAP servers. use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, SystemTime}; use async_imap::{ error::Result as ImapResult, - extensions::idle::IdleResponse, types::{Fetch, Flag, Mailbox, Name, NameAttribute}, }; -use async_std::prelude::*; use async_std::sync::{Mutex, RwLock}; use async_std::task; @@ -28,6 +25,7 @@ use crate::param::Params; use crate::stock::StockMessage; use crate::wrapmime; +mod idle; pub mod select_folder; const DC_IMAP_SEEN: usize = 0x0001; @@ -51,22 +49,16 @@ pub enum Error { #[fail(display = "IMAP Could not login as {}", _0)] LoginFailed(String), - #[fail(display = "IMAP Could not fetch {}", _0)] + #[fail(display = "IMAP Could not fetch")] FetchFailed(#[cause] async_imap::error::Error), - #[fail(display = "IMAP IDLE protocol failed to init/complete")] - IdleProtocolFailed(#[cause] async_imap::error::Error), - - #[fail(display = "IMAP server does not have IDLE capability")] - IdleAbilityMissing, - #[fail(display = "IMAP operation attempted while it is torn down")] InTeardown, #[fail(display = "IMAP operation attempted while it is torn down")] SqlError(#[cause] crate::sql::Error), - #[fail(display = "IMAP got error from elsewhere: {:?}", _0)] + #[fail(display = "IMAP got error from elsewhere")] WrappedError(#[cause] crate::error::Error), #[fail(display = "IMAP select folder error")] @@ -730,210 +722,6 @@ impl Imap { 1 } - pub fn idle(&self, context: &Context, watch_folder: Option) -> Result<()> { - task::block_on(async move { - if !self.config.read().await.can_idle { - return Err(Error::IdleAbilityMissing); - } - - self.setup_handle_if_needed(context).await?; - - self.select_folder(context, watch_folder.clone()).await?; - - let session = self.session.lock().await.take(); - let timeout = Duration::from_secs(23 * 60); - if let Some(session) = session { - match session.idle() { - // BEWARE: If you change the Secure branch you - // typically also need to change the Insecure branch. - IdleHandle::Secure(mut handle) => { - if let Err(err) = handle.init().await { - return Err(Error::IdleProtocolFailed(err)); - } - - let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); - - if self.skip_next_idle_wait.load(Ordering::SeqCst) { - // interrupt_idle has happened before we - // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); - std::mem::drop(idle_wait); - info!(context, "Idle wait was skipped"); - } else { - info!(context, "Idle entering wait-on-remote state"); - match idle_wait.await { - IdleResponse::NewData(_) => { - info!(context, "Idle has NewData"); - } - // TODO: idle_wait does not distinguish manual interrupts - // from Timeouts if we would know it's a Timeout we could bail - // directly and reconnect . - IdleResponse::Timeout => { - info!(context, "Idle-wait timeout or interruption"); - } - IdleResponse::ManualInterrupt => { - info!(context, "Idle wait was interrupted"); - } - } - } - match handle.done().await { - Ok(session) => { - *self.session.lock().await = Some(Session::Secure(session)); - } - Err(err) => { - // if we cannot terminate IDLE it probably - // means that we waited long (with idle_wait) - // but the network went away/changed - self.trigger_reconnect(); - return Err(Error::IdleProtocolFailed(err)); - } - } - } - IdleHandle::Insecure(mut handle) => { - if let Err(err) = handle.init().await { - return Err(Error::IdleProtocolFailed(err)); - } - - let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); - *self.interrupt.lock().await = Some(interrupt); - - if self.skip_next_idle_wait.load(Ordering::SeqCst) { - // interrupt_idle has happened before we - // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); - std::mem::drop(idle_wait); - info!(context, "Idle wait was skipped"); - } else { - info!(context, "Idle entering wait-on-remote state"); - match idle_wait.await { - IdleResponse::NewData(_) => { - info!(context, "Idle has NewData"); - } - // TODO: idle_wait does not distinguish manual interrupts - // from Timeouts if we would know it's a Timeout we could bail - // directly and reconnect . - IdleResponse::Timeout => { - info!(context, "Idle-wait timeout or interruption"); - } - IdleResponse::ManualInterrupt => { - info!(context, "Idle wait was interrupted"); - } - } - } - match handle.done().await { - Ok(session) => { - *self.session.lock().await = Some(Session::Insecure(session)); - } - Err(err) => { - // if we cannot terminate IDLE it probably - // means that we waited long (with idle_wait) - // but the network went away/changed - self.trigger_reconnect(); - return Err(Error::IdleProtocolFailed(err)); - } - } - } - } - } - - Ok(()) - }) - } - - pub(crate) fn fake_idle(&self, context: &Context, watch_folder: Option) { - // Idle using polling. This is also needed if we're not yet configured - - // in this case, we're waiting for a configure job (and an interrupt). - task::block_on(async move { - let fake_idle_start_time = SystemTime::now(); - - info!(context, "IMAP-fake-IDLEing..."); - - let interrupt = stop_token::StopSource::new(); - - // check every minute if there are new messages - // TODO: grow sleep durations / make them more flexible - let interval = async_std::stream::interval(Duration::from_secs(60)); - let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); - *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::SeqCst) { - // interrupt_idle has happened before we - // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::SeqCst); - info!(context, "fake-idle wait was skipped"); - } else { - // loop until we are interrupted or if we fetched something - while let Some(_) = interrupt_interval.next().await { - // try to connect with proper login params - // (setup_handle_if_needed might not know about them if we - // never successfully connected) - if let Err(err) = self.connect_configured(context) { - warn!(context, "fake_idle: could not connect: {}", err); - continue; - } - if self.config.read().await.can_idle { - // we only fake-idled because network was gone during IDLE, probably - break; - } - info!(context, "fake_idle is connected"); - // we are connected, let's see if fetching messages results - // in anything. If so, we behave as if IDLE had data but - // will have already fetched the messages so perform_*_fetch - // will not find any new. - - if let Some(ref watch_folder) = watch_folder { - match self.fetch_new_messages(context, watch_folder).await { - Ok(res) => { - info!(context, "fetch_new_messages returned {:?}", res); - if res { - break; - } - } - Err(err) => { - error!(context, "could not fetch from folder: {}", err); - self.trigger_reconnect() - } - } - } - } - } - self.interrupt.lock().await.take(); - - info!( - context, - "IMAP-fake-IDLE done after {:.4}s", - SystemTime::now() - .duration_since(fake_idle_start_time) - .unwrap() - .as_millis() as f64 - / 1000., - ); - }) - } - - pub fn interrupt_idle(&self, context: &Context) { - task::block_on(async move { - let mut interrupt: Option = self.interrupt.lock().await.take(); - if interrupt.is_none() { - // idle wait is not running, signal it needs to skip - self.skip_next_idle_wait.store(true, Ordering::SeqCst); - - // meanwhile idle-wait may have produced the StopSource - interrupt = self.interrupt.lock().await.take(); - } - // let's manually drop the StopSource - if interrupt.is_some() { - // the imap thread provided us a stop token but might - // not have entered idle_wait yet, give it some time - // for that to happen. XXX handle this without extra wait - // https://github.com/deltachat/deltachat-core-rust/issues/925 - std::thread::sleep(Duration::from_millis(200)); - info!(context, "low-level: dropping stop-source to interrupt idle"); - std::mem::drop(interrupt) - } - }); - } - pub fn mv( &self, context: &Context, diff --git a/src/job_thread.rs b/src/job_thread.rs index 427e6a077..ad6dd6e7b 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -170,20 +170,19 @@ impl JobThread { let prefix = format!("{}-IDLE", self.name); let do_fake_idle = match self.imap.connect_configured(context) { Ok(()) => { - info!(context, "{} started...", prefix); - let watch_folder = self.get_watch_folder(context); - let res = self.imap.idle(context, watch_folder); - info!(context, "{} ended...", prefix); - match res { - Ok(()) => false, - Err(crate::imap::Error::IdleAbilityMissing) => true, // we have to do fake_idle - Err(err) => { + if !self.imap.can_idle() { + true // we have to do fake_idle + } else { + let watch_folder = self.get_watch_folder(context); + info!(context, "{} started...", prefix); + let res = self.imap.idle(context, watch_folder); + info!(context, "{} ended...", prefix); + if let Err(err) = res { warn!(context, "{} failed: {} -> reconnecting", prefix, err); // something is borked, let's start afresh on the next occassion self.imap.disconnect(context); - - false } + false } } Err(err) => {