From be533fa66a909890e01a73fb8361ddeb93cc49e9 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 18 Nov 2019 03:06:54 +0100 Subject: [PATCH] resultify some imap operations --- src/configure/mod.rs | 29 ++++++----- src/error.rs | 10 ++++ src/imap.rs | 103 +++++++++++++++++++------------------ src/job.rs | 119 +++++++++++++++++++++++++++---------------- src/job_thread.rs | 102 ++++++++++++++++++++++--------------- 5 files changed, 216 insertions(+), 147 deletions(-) diff --git a/src/configure/mod.rs b/src/configure/mod.rs index f57d87e1c..d8a5ad728 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -7,6 +7,7 @@ use crate::constants::*; use crate::context::Context; use crate::dc_tools::*; use crate::e2ee; +use crate::error::*; use crate::imap::*; use crate::job::*; use crate::login_param::LoginParam; @@ -566,23 +567,23 @@ fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option { } /// Connects to the configured account -pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> libc::c_int { - let mut ret_connected = 0; - +pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> Result<()> { if async_std::task::block_on(async move { imap.is_connected().await }) { - ret_connected = 1 - } else if !context.sql.get_raw_config_bool(context, "configured") { - warn!(context, "Not configured, cannot connect.",); - } else { - let param = LoginParam::from_database(context, "configured_"); - // the trailing underscore is correct - - if imap.connect(context, ¶m) { - ret_connected = 2; - } + return Ok(()); + } + if !context.sql.get_raw_config_bool(context, "configured") { + return Err(Error::ConnectWithoutConfigure); } - ret_connected + let param = LoginParam::from_database(context, "configured_"); + // the trailing underscore is correct + + if imap.connect(context, ¶m) { + return Ok(()); + } + return Err(Error::ImapConnectionFailed( + format!("{}", param).to_string(), + )); } /******************************************************************************* diff --git a/src/error.rs b/src/error.rs index 14c84df85..5497251b1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -34,6 +34,16 @@ pub enum Error { BlobError(#[cause] crate::blob::BlobError), #[fail(display = "Invalid Message ID.")] InvalidMsgId, + #[fail(display = "Watch folder not found {:?}", _0)] + WatchFolderNotFound(String), + #[fail(display = "Connection Failed params: {}", _0)] + ImapConnectionFailed(String), + #[fail(display = "Cannot idle")] + ImapMissesIdle, + #[fail(display = "Imap IDLE protocol failed to init/complete")] + ImapIdleProtocolFailed(String), + #[fail(display = "Connect without configured params")] + ConnectWithoutConfigure, } pub type Result = std::result::Result; diff --git a/src/imap.rs b/src/imap.rs index a49b40ed6..5ded6d751 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -34,6 +34,12 @@ pub enum ImapActionResult { Success, } +#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)] +pub enum IdlePollMode { + Often, + Never, +} + const PREFETCH_FLAGS: &str = "(UID ENVELOPE)"; const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; const SELECT_ALL: &str = "1:*"; @@ -746,17 +752,15 @@ impl Imap { 1 } - pub fn idle(&self, context: &Context) { + pub fn idle(&self, context: &Context) -> Result<(), Error> { task::block_on(async move { - if self.config.read().await.selected_folder.is_none() { - // this probably means that we are in teardown - // in any case we can't perform any idling - return; - } + ensure!( + self.config.read().await.selected_folder.is_some(), + "no folder selected, probably in teardown?" + ); if !self.config.read().await.can_idle { - self.fake_idle(context, true).await; - return; + return Err(Error::ImapMissesIdle); } self.setup_handle_if_needed(context); @@ -766,13 +770,7 @@ impl Imap { ImapActionResult::Success | ImapActionResult::AlreadyDone => {} ImapActionResult::Failed | ImapActionResult::RetryLater => { - warn!( - context, - "idle select_folder failed {:?}", - watch_folder.as_ref() - ); - self.fake_idle(context, true).await; - return; + bail!("IMAP select failed for {:?}", watch_folder.as_ref()); } } @@ -784,9 +782,9 @@ impl Imap { // typically also need to change the Insecure branch. IdleHandle::Secure(mut handle) => { if let Err(err) = handle.init().await { - warn!(context, "Failed to establish IDLE connection: {:?}", err); - return; + bail!("IDLE init failed: {}", err); } + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); *self.interrupt.lock().await = Some(interrupt); @@ -810,18 +808,15 @@ impl Imap { // means that we waited long (with idle_wait) // but the network went away/changed self.trigger_reconnect(); - warn!( - context, - "Failed to terminate IMAP IDLE connection: {:?}", err - ); + return Err(Error::ImapIdleProtocolFailed(format!("{}", err))); } } } IdleHandle::Insecure(mut handle) => { if let Err(err) = handle.init().await { - warn!(context, "Failed to establish IDLE connection: {:?}", err); - return; + bail!("IDLE init failed: {}", err); } + let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); *self.interrupt.lock().await = Some(interrupt); @@ -836,7 +831,6 @@ impl Imap { let res = idle_wait.await; info!(context, "Idle finished wait-on-remote: {:?}", res); } - match handle.done().await { Ok(session) => { *self.session.lock().await = Some(Session::Insecure(session)); @@ -846,23 +840,24 @@ impl Imap { // means that we waited long (with idle_wait) // but the network went away/changed self.trigger_reconnect(); - warn!(context, "Failed to close IMAP IDLE connection: {:?}", err); + return Err(Error::ImapIdleProtocolFailed(format!("{}", err))); } } } } } - }); + + Ok(()) + }) } - pub(crate) async fn fake_idle(&self, context: &Context, use_network: bool) { - // Idle using timeouts. This is also needed if we're not yet configured - - // in this case, we're waiting for a configure job - let fake_idle_start_time = SystemTime::now(); - - info!(context, "IMAP-fake-IDLEing..."); - + pub(crate) fn fake_idle(&self, context: &Context, poll_mode: IdlePollMode) { + // Idle using polling. task::block_on(async move { + let fake_idle_start_time = SystemTime::now(); + + info!(context, "IMAP-fake-IDLEing..."); + let interrupt = stop_token::StopSource::new(); // we use 1000 minutes if we are told to not try network @@ -870,21 +865,29 @@ impl Imap { // but clients are still calling us in a loop. // if we are to use network, we check every minute if there // is new mail -- TODO: make this more flexible - let secs = if use_network { 60 } else { 60000 }; + let secs = match poll_mode { + IdlePollMode::Never => 60000, + IdlePollMode::Often => 60, + }; let interval = async_std::stream::interval(Duration::from_secs(secs)); let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); *self.interrupt.lock().await = Some(interrupt); + // loop until we are interrupted or if we fetched something while let Some(_) = interrupt_interval.next().await { - if !use_network { + if poll_mode == IdlePollMode::Never { continue; } if !self.is_connected().await { // try to connect with proper login params // (setup_handle_if_needed might not know about them if we // never successfully connected) - if dc_connect_to_configured_imap(context, &self) != 0 { - self.interrupt.lock().await.take(); + match dc_connect_to_configured_imap(context, &self) { + Ok(()) => {} + Err(err) => { + warn!(context, "fake_idle: could not connect: {}", err); + continue; + } } } // we are connected, let's see if fetching messages results @@ -895,22 +898,22 @@ impl Imap { let watch_folder = self.config.read().await.watch_folder.clone(); if let Some(watch_folder) = watch_folder { if 0 != self.fetch_from_single_folder(context, watch_folder).await { - self.interrupt.lock().await.take(); break; } } } - }); + self.interrupt.lock().await.take(); - info!( - context, - "IMAP-fake-IDLE done after {:.4}s", - SystemTime::now() - .duration_since(fake_idle_start_time) - .unwrap() - .as_millis() as f64 - / 1000., - ); + info!( + context, + "IMAP-fake-IDLE done after {:.4}s", + SystemTime::now() + .duration_since(fake_idle_start_time) + .unwrap() + .as_millis() as f64 + / 1000., + ); + }) } pub fn interrupt_idle(&self) { @@ -1052,8 +1055,8 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::Failed); } else if !self.is_connected().await { - connect_to_inbox(context, &self); - if !self.is_connected().await { + if let Err(err) = connect_to_inbox(context, &self) { + warn!(context, "prepare_imap_op failed: {}", err); return Some(ImapActionResult::RetryLater); } } diff --git a/src/job.rs b/src/job.rs index cf34afbbb..fbb275215 100644 --- a/src/job.rs +++ b/src/job.rs @@ -367,14 +367,15 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool { } pub fn perform_imap_fetch(context: &Context) { + if !context.get_config_bool(Config::InboxWatch) { + info!(context, "INBOX-fetch skipped: INBOX-watch is disabled."); + return; + } let inbox = context.inbox.read().unwrap(); let start = std::time::Instant::now(); - if 0 == connect_to_inbox(context, &inbox) { - return; - } - if !context.get_config_bool(Config::InboxWatch) { - info!(context, "INBOX-watch disabled.",); + if let Err(err) = connect_to_inbox(context, &inbox) { + warn!(context, "could not connect to inbox: {:?}", err); return; } info!(context, "INBOX-fetch started...",); @@ -391,10 +392,6 @@ pub fn perform_imap_fetch(context: &Context) { } pub fn perform_imap_idle(context: &Context) { - let inbox = context.inbox.read().unwrap(); - - connect_to_inbox(context, &inbox); - if *context.perform_inbox_jobs_needed.clone().read().unwrap() { info!( context, @@ -402,9 +399,48 @@ pub fn perform_imap_idle(context: &Context) { ); return; } - info!(context, "INBOX-IDLE started..."); - inbox.idle(context); - info!(context, "INBOX-IDLE ended."); + let inbox = context.inbox.read().unwrap(); + let poll_mode = if !context.get_config_bool(Config::InboxWatch) { + Some(IdlePollMode::Never) + } else { + match connect_to_inbox(context, &inbox) { + Err(Error::ImapConnectionFailed(param)) => { + warn!(context, "perform_imap_idle could not connect {:?}", param); + Some(IdlePollMode::Often) + } + Err(err) => { + warn!(context, "perform_imap_idle error: {}", err); + // anything else than a plain connection error + // hints at configuration issues. + Some(IdlePollMode::Never) + } + + Ok(()) => { + info!(context, "INBOX-IDLE starting..."); + let res = inbox.idle(context); + info!(context, "INBOX-IDLE ended."); + + match res { + Ok(()) => None, + Err(Error::ImapConnectionFailed(param)) => { + warn!( + context, + "perform_imap_idle IDLE could not connect {:?}", param + ); + Some(IdlePollMode::Often) + } + Err(err) => { + warn!(context, "perform_imap_idle IDLE error: {}", err); + Some(IdlePollMode::Never) + } + } + } + } + }; + + if let Some(poll_mode) = poll_mode { + inbox.fake_idle(context, poll_mode); + } } pub fn perform_mvbox_fetch(context: &Context) { @@ -724,32 +760,31 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { params_probe }; - let jobs: Result, _> = context.sql.query_map( - query, - params, - |row| { - let job = Job { - job_id: row.get(0)?, - action: row.get(1)?, - foreign_id: row.get(2)?, - desired_timestamp: row.get(5)?, - added_timestamp: row.get(4)?, - tries: row.get(6)?, - param: row.get::<_, String>(3)?.parse().unwrap_or_default(), - try_again: 0, - pending_error: None, - }; + let jobs: Result, _> = context + .sql + .query_map( + query, + params, + |row| { + let job = Job { + job_id: row.get(0)?, + action: row.get(1)?, + foreign_id: row.get(2)?, + desired_timestamp: row.get(5)?, + added_timestamp: row.get(4)?, + tries: row.get(6)?, + param: row.get::<_, String>(3)?.parse().unwrap_or_default(), + try_again: 0, + pending_error: None, + }; - Ok(job) - }, - |jobs| jobs.collect::, _>>().map_err(Into::into), - ); - match jobs { - Ok(ref _res) => {} - Err(ref err) => { - info!(context, "query failed: {:?}", err); - } - } + Ok(job) + }, + |jobs| jobs.collect::, _>>().map_err(Into::into), + ) + .map_err(|err| { + warn!(context, "query failed: {:?}", err); + }); for mut job in jobs.unwrap_or_default() { info!( @@ -928,12 +963,10 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) { } } -pub fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { - let ret_connected = dc_connect_to_configured_imap(context, inbox); - if 0 != ret_connected { - inbox.set_watch_folder("INBOX".into()); - } - ret_connected +pub fn connect_to_inbox(context: &Context, imap: &Imap) -> Result<(), Error> { + dc_connect_to_configured_imap(context, imap)?; + imap.set_watch_folder("INBOX".into()); + Ok(()) } fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> { diff --git a/src/job_thread.rs b/src/job_thread.rs index 21362fb5b..67522936a 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -2,7 +2,8 @@ use std::sync::{Arc, Condvar, Mutex}; use crate::configure::*; use crate::context::Context; -use crate::imap::Imap; +use crate::error::Error; +use crate::imap::{IdlePollMode, Imap}; #[derive(Debug)] pub struct JobThread { @@ -86,52 +87,60 @@ impl JobThread { if use_network { let start = std::time::Instant::now(); - if self.connect_to_imap(context) { - info!(context, "{}-fetch started...", self.name); - self.imap.fetch(context); - - if self.imap.should_reconnect() { - info!(context, "{}-fetch aborted, starting over...", self.name,); + let prefix = format!("{}-fetch", self.name); + match self.connect_to_imap(context) { + Ok(()) => { + info!(context, "{} started...", prefix); self.imap.fetch(context); + + if self.imap.should_reconnect() { + info!(context, "{} aborted, starting over...", prefix); + self.imap.fetch(context); + } + info!( + context, + "{} done in {:.3} ms.", + prefix, + start.elapsed().as_millis(), + ); + } + Err(err) => { + warn!( + context, + "{} skipped, could not connect to imap {:?}", prefix, err + ); } - info!( - context, - "{}-fetch done in {:.3} ms.", - self.name, - start.elapsed().as_millis(), - ); } } self.state.0.lock().unwrap().using_handle = false; } - fn connect_to_imap(&self, context: &Context) -> bool { + fn connect_to_imap(&self, context: &Context) -> Result<(), Error> { if async_std::task::block_on(async move { self.imap.is_connected().await }) { - return true; + return Ok(()); } let watch_folder_name = match context.sql.get_raw_config(context, self.folder_config_name) { Some(name) => name, None => { - return false; + return Err(Error::WatchFolderNotFound( + self.folder_config_name.to_string(), + )); } }; - let ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0; - if ret_connected { - if context - .sql - .get_raw_config_int(context, "folders_configured") - .unwrap_or_default() - < 3 - { - self.imap.configure_folders(context, 0x1); - } - - self.imap.set_watch_folder(watch_folder_name); + dc_connect_to_configured_imap(context, &self.imap)?; + if context + .sql + .get_raw_config_int(context, "folders_configured") + .unwrap_or_default() + < 3 + { + self.imap.configure_folders(context, 0x1); } + self.imap.set_watch_folder(watch_folder_name); - ret_connected + Ok(()) } pub fn idle(&self, context: &Context, use_network: bool) { @@ -170,17 +179,30 @@ impl JobThread { } } - if self.connect_to_imap(context) { - info!(context, "{}-IDLE started...", self.name,); - self.imap.idle(context); - info!(context, "{}-IDLE ended.", self.name); - } else { - // It's probably wrong that the thread even runs - // but let's call fake_idle and tell it to not try network at all. - // (once we move to rust-managed threads this problem goes away) - info!(context, "{}-IDLE not connected, fake-idling", self.name); - async_std::task::block_on(async move { self.imap.fake_idle(context, false).await }); - info!(context, "{}-IDLE fake-idling finished", self.name); + let poll_mode = match self.connect_to_imap(context) { + Ok(()) => { + info!(context, "{}-IDLE started...", self.name,); + let res = self.imap.idle(context); + info!(context, "{}-IDLE ended.", self.name); + match res { + Ok(()) => None, + Err(Error::ImapConnectionFailed(err)) => { + warn!(context, "idle connection failed: {}", err); + Some(IdlePollMode::Often) + } + Err(err) => { + warn!(context, "idle failed: {}", err); + Some(IdlePollMode::Never) + } + } + } + Err(err) => { + info!(context, "{}-IDLE fail: {:?}", self.name, err); + Some(IdlePollMode::Never) + } + }; + if let Some(poll_mode) = poll_mode { + self.imap.fake_idle(context, poll_mode); } self.state.0.lock().unwrap().using_handle = false;