resultify some imap operations

This commit is contained in:
holger krekel
2019-11-18 03:06:54 +01:00
parent 254b061921
commit be533fa66a
5 changed files with 216 additions and 147 deletions

View File

@@ -7,6 +7,7 @@ use crate::constants::*;
use crate::context::Context; use crate::context::Context;
use crate::dc_tools::*; use crate::dc_tools::*;
use crate::e2ee; use crate::e2ee;
use crate::error::*;
use crate::imap::*; use crate::imap::*;
use crate::job::*; use crate::job::*;
use crate::login_param::LoginParam; use crate::login_param::LoginParam;
@@ -566,23 +567,23 @@ fn try_smtp_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
} }
/// Connects to the configured account /// Connects to the configured account
pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> libc::c_int { pub fn dc_connect_to_configured_imap(context: &Context, imap: &Imap) -> Result<()> {
let mut ret_connected = 0;
if async_std::task::block_on(async move { imap.is_connected().await }) { if async_std::task::block_on(async move { imap.is_connected().await }) {
ret_connected = 1 return Ok(());
} else if !context.sql.get_raw_config_bool(context, "configured") { }
warn!(context, "Not configured, cannot connect.",); if !context.sql.get_raw_config_bool(context, "configured") {
} else { return Err(Error::ConnectWithoutConfigure);
let param = LoginParam::from_database(context, "configured_");
// the trailing underscore is correct
if imap.connect(context, &param) {
ret_connected = 2;
}
} }
ret_connected let param = LoginParam::from_database(context, "configured_");
// the trailing underscore is correct
if imap.connect(context, &param) {
return Ok(());
}
return Err(Error::ImapConnectionFailed(
format!("{}", param).to_string(),
));
} }
/******************************************************************************* /*******************************************************************************

View File

@@ -34,6 +34,16 @@ pub enum Error {
BlobError(#[cause] crate::blob::BlobError), BlobError(#[cause] crate::blob::BlobError),
#[fail(display = "Invalid Message ID.")] #[fail(display = "Invalid Message ID.")]
InvalidMsgId, InvalidMsgId,
#[fail(display = "Watch folder not found {:?}", _0)]
WatchFolderNotFound(String),
#[fail(display = "Connection Failed params: {}", _0)]
ImapConnectionFailed(String),
#[fail(display = "Cannot idle")]
ImapMissesIdle,
#[fail(display = "Imap IDLE protocol failed to init/complete")]
ImapIdleProtocolFailed(String),
#[fail(display = "Connect without configured params")]
ConnectWithoutConfigure,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -34,6 +34,12 @@ pub enum ImapActionResult {
Success, Success,
} }
#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)]
pub enum IdlePollMode {
Often,
Never,
}
const PREFETCH_FLAGS: &str = "(UID ENVELOPE)"; const PREFETCH_FLAGS: &str = "(UID ENVELOPE)";
const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])"; const BODY_FLAGS: &str = "(FLAGS BODY.PEEK[])";
const SELECT_ALL: &str = "1:*"; const SELECT_ALL: &str = "1:*";
@@ -746,17 +752,15 @@ impl Imap {
1 1
} }
pub fn idle(&self, context: &Context) { pub fn idle(&self, context: &Context) -> Result<(), Error> {
task::block_on(async move { task::block_on(async move {
if self.config.read().await.selected_folder.is_none() { ensure!(
// this probably means that we are in teardown self.config.read().await.selected_folder.is_some(),
// in any case we can't perform any idling "no folder selected, probably in teardown?"
return; );
}
if !self.config.read().await.can_idle { if !self.config.read().await.can_idle {
self.fake_idle(context, true).await; return Err(Error::ImapMissesIdle);
return;
} }
self.setup_handle_if_needed(context); self.setup_handle_if_needed(context);
@@ -766,13 +770,7 @@ impl Imap {
ImapActionResult::Success | ImapActionResult::AlreadyDone => {} ImapActionResult::Success | ImapActionResult::AlreadyDone => {}
ImapActionResult::Failed | ImapActionResult::RetryLater => { ImapActionResult::Failed | ImapActionResult::RetryLater => {
warn!( bail!("IMAP select failed for {:?}", watch_folder.as_ref());
context,
"idle select_folder failed {:?}",
watch_folder.as_ref()
);
self.fake_idle(context, true).await;
return;
} }
} }
@@ -784,9 +782,9 @@ impl Imap {
// typically also need to change the Insecure branch. // typically also need to change the Insecure branch.
IdleHandle::Secure(mut handle) => { IdleHandle::Secure(mut handle) => {
if let Err(err) = handle.init().await { if let Err(err) = handle.init().await {
warn!(context, "Failed to establish IDLE connection: {:?}", err); bail!("IDLE init failed: {}", err);
return;
} }
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
@@ -810,18 +808,15 @@ impl Imap {
// means that we waited long (with idle_wait) // means that we waited long (with idle_wait)
// but the network went away/changed // but the network went away/changed
self.trigger_reconnect(); self.trigger_reconnect();
warn!( return Err(Error::ImapIdleProtocolFailed(format!("{}", err)));
context,
"Failed to terminate IMAP IDLE connection: {:?}", err
);
} }
} }
} }
IdleHandle::Insecure(mut handle) => { IdleHandle::Insecure(mut handle) => {
if let Err(err) = handle.init().await { if let Err(err) = handle.init().await {
warn!(context, "Failed to establish IDLE connection: {:?}", err); bail!("IDLE init failed: {}", err);
return;
} }
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
@@ -836,7 +831,6 @@ impl Imap {
let res = idle_wait.await; let res = idle_wait.await;
info!(context, "Idle finished wait-on-remote: {:?}", res); info!(context, "Idle finished wait-on-remote: {:?}", res);
} }
match handle.done().await { match handle.done().await {
Ok(session) => { Ok(session) => {
*self.session.lock().await = Some(Session::Insecure(session)); *self.session.lock().await = Some(Session::Insecure(session));
@@ -846,23 +840,24 @@ impl Imap {
// means that we waited long (with idle_wait) // means that we waited long (with idle_wait)
// but the network went away/changed // but the network went away/changed
self.trigger_reconnect(); self.trigger_reconnect();
warn!(context, "Failed to close IMAP IDLE connection: {:?}", err); return Err(Error::ImapIdleProtocolFailed(format!("{}", err)));
} }
} }
} }
} }
} }
});
Ok(())
})
} }
pub(crate) async fn fake_idle(&self, context: &Context, use_network: bool) { pub(crate) fn fake_idle(&self, context: &Context, poll_mode: IdlePollMode) {
// Idle using timeouts. This is also needed if we're not yet configured - // Idle using polling.
// in this case, we're waiting for a configure job
let fake_idle_start_time = SystemTime::now();
info!(context, "IMAP-fake-IDLEing...");
task::block_on(async move { task::block_on(async move {
let fake_idle_start_time = SystemTime::now();
info!(context, "IMAP-fake-IDLEing...");
let interrupt = stop_token::StopSource::new(); let interrupt = stop_token::StopSource::new();
// we use 1000 minutes if we are told to not try network // we use 1000 minutes if we are told to not try network
@@ -870,21 +865,29 @@ impl Imap {
// but clients are still calling us in a loop. // but clients are still calling us in a loop.
// if we are to use network, we check every minute if there // if we are to use network, we check every minute if there
// is new mail -- TODO: make this more flexible // is new mail -- TODO: make this more flexible
let secs = if use_network { 60 } else { 60000 }; let secs = match poll_mode {
IdlePollMode::Never => 60000,
IdlePollMode::Often => 60,
};
let interval = async_std::stream::interval(Duration::from_secs(secs)); let interval = async_std::stream::interval(Duration::from_secs(secs));
let mut interrupt_interval = interrupt.stop_token().stop_stream(interval); let mut interrupt_interval = interrupt.stop_token().stop_stream(interval);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
// loop until we are interrupted or if we fetched something
while let Some(_) = interrupt_interval.next().await { while let Some(_) = interrupt_interval.next().await {
if !use_network { if poll_mode == IdlePollMode::Never {
continue; continue;
} }
if !self.is_connected().await { if !self.is_connected().await {
// try to connect with proper login params // try to connect with proper login params
// (setup_handle_if_needed might not know about them if we // (setup_handle_if_needed might not know about them if we
// never successfully connected) // never successfully connected)
if dc_connect_to_configured_imap(context, &self) != 0 { match dc_connect_to_configured_imap(context, &self) {
self.interrupt.lock().await.take(); Ok(()) => {}
Err(err) => {
warn!(context, "fake_idle: could not connect: {}", err);
continue;
}
} }
} }
// we are connected, let's see if fetching messages results // we are connected, let's see if fetching messages results
@@ -895,22 +898,22 @@ impl Imap {
let watch_folder = self.config.read().await.watch_folder.clone(); let watch_folder = self.config.read().await.watch_folder.clone();
if let Some(watch_folder) = watch_folder { if let Some(watch_folder) = watch_folder {
if 0 != self.fetch_from_single_folder(context, watch_folder).await { if 0 != self.fetch_from_single_folder(context, watch_folder).await {
self.interrupt.lock().await.take();
break; break;
} }
} }
} }
}); self.interrupt.lock().await.take();
info!( info!(
context, context,
"IMAP-fake-IDLE done after {:.4}s", "IMAP-fake-IDLE done after {:.4}s",
SystemTime::now() SystemTime::now()
.duration_since(fake_idle_start_time) .duration_since(fake_idle_start_time)
.unwrap() .unwrap()
.as_millis() as f64 .as_millis() as f64
/ 1000., / 1000.,
); );
})
} }
pub fn interrupt_idle(&self) { pub fn interrupt_idle(&self) {
@@ -1052,8 +1055,8 @@ impl Imap {
if uid == 0 { if uid == 0 {
return Some(ImapActionResult::Failed); return Some(ImapActionResult::Failed);
} else if !self.is_connected().await { } else if !self.is_connected().await {
connect_to_inbox(context, &self); if let Err(err) = connect_to_inbox(context, &self) {
if !self.is_connected().await { warn!(context, "prepare_imap_op failed: {}", err);
return Some(ImapActionResult::RetryLater); return Some(ImapActionResult::RetryLater);
} }
} }

View File

@@ -367,14 +367,15 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool {
} }
pub fn perform_imap_fetch(context: &Context) { pub fn perform_imap_fetch(context: &Context) {
if !context.get_config_bool(Config::InboxWatch) {
info!(context, "INBOX-fetch skipped: INBOX-watch is disabled.");
return;
}
let inbox = context.inbox.read().unwrap(); let inbox = context.inbox.read().unwrap();
let start = std::time::Instant::now(); let start = std::time::Instant::now();
if 0 == connect_to_inbox(context, &inbox) { if let Err(err) = connect_to_inbox(context, &inbox) {
return; warn!(context, "could not connect to inbox: {:?}", err);
}
if !context.get_config_bool(Config::InboxWatch) {
info!(context, "INBOX-watch disabled.",);
return; return;
} }
info!(context, "INBOX-fetch started...",); info!(context, "INBOX-fetch started...",);
@@ -391,10 +392,6 @@ pub fn perform_imap_fetch(context: &Context) {
} }
pub fn perform_imap_idle(context: &Context) { pub fn perform_imap_idle(context: &Context) {
let inbox = context.inbox.read().unwrap();
connect_to_inbox(context, &inbox);
if *context.perform_inbox_jobs_needed.clone().read().unwrap() { if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!( info!(
context, context,
@@ -402,9 +399,48 @@ pub fn perform_imap_idle(context: &Context) {
); );
return; return;
} }
info!(context, "INBOX-IDLE started..."); let inbox = context.inbox.read().unwrap();
inbox.idle(context); let poll_mode = if !context.get_config_bool(Config::InboxWatch) {
info!(context, "INBOX-IDLE ended."); Some(IdlePollMode::Never)
} else {
match connect_to_inbox(context, &inbox) {
Err(Error::ImapConnectionFailed(param)) => {
warn!(context, "perform_imap_idle could not connect {:?}", param);
Some(IdlePollMode::Often)
}
Err(err) => {
warn!(context, "perform_imap_idle error: {}", err);
// anything else than a plain connection error
// hints at configuration issues.
Some(IdlePollMode::Never)
}
Ok(()) => {
info!(context, "INBOX-IDLE starting...");
let res = inbox.idle(context);
info!(context, "INBOX-IDLE ended.");
match res {
Ok(()) => None,
Err(Error::ImapConnectionFailed(param)) => {
warn!(
context,
"perform_imap_idle IDLE could not connect {:?}", param
);
Some(IdlePollMode::Often)
}
Err(err) => {
warn!(context, "perform_imap_idle IDLE error: {}", err);
Some(IdlePollMode::Never)
}
}
}
}
};
if let Some(poll_mode) = poll_mode {
inbox.fake_idle(context, poll_mode);
}
} }
pub fn perform_mvbox_fetch(context: &Context) { pub fn perform_mvbox_fetch(context: &Context) {
@@ -724,32 +760,31 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) {
params_probe params_probe
}; };
let jobs: Result<Vec<Job>, _> = context.sql.query_map( let jobs: Result<Vec<Job>, _> = context
query, .sql
params, .query_map(
|row| { query,
let job = Job { params,
job_id: row.get(0)?, |row| {
action: row.get(1)?, let job = Job {
foreign_id: row.get(2)?, job_id: row.get(0)?,
desired_timestamp: row.get(5)?, action: row.get(1)?,
added_timestamp: row.get(4)?, foreign_id: row.get(2)?,
tries: row.get(6)?, desired_timestamp: row.get(5)?,
param: row.get::<_, String>(3)?.parse().unwrap_or_default(), added_timestamp: row.get(4)?,
try_again: 0, tries: row.get(6)?,
pending_error: None, param: row.get::<_, String>(3)?.parse().unwrap_or_default(),
}; try_again: 0,
pending_error: None,
};
Ok(job) Ok(job)
}, },
|jobs| jobs.collect::<Result<Vec<Job>, _>>().map_err(Into::into), |jobs| jobs.collect::<Result<Vec<Job>, _>>().map_err(Into::into),
); )
match jobs { .map_err(|err| {
Ok(ref _res) => {} warn!(context, "query failed: {:?}", err);
Err(ref err) => { });
info!(context, "query failed: {:?}", err);
}
}
for mut job in jobs.unwrap_or_default() { for mut job in jobs.unwrap_or_default() {
info!( info!(
@@ -928,12 +963,10 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) {
} }
} }
pub fn connect_to_inbox(context: &Context, inbox: &Imap) -> libc::c_int { pub fn connect_to_inbox(context: &Context, imap: &Imap) -> Result<(), Error> {
let ret_connected = dc_connect_to_configured_imap(context, inbox); dc_connect_to_configured_imap(context, imap)?;
if 0 != ret_connected { imap.set_watch_folder("INBOX".into());
inbox.set_watch_folder("INBOX".into()); Ok(())
}
ret_connected
} }
fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> { fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> {

View File

@@ -2,7 +2,8 @@ use std::sync::{Arc, Condvar, Mutex};
use crate::configure::*; use crate::configure::*;
use crate::context::Context; use crate::context::Context;
use crate::imap::Imap; use crate::error::Error;
use crate::imap::{IdlePollMode, Imap};
#[derive(Debug)] #[derive(Debug)]
pub struct JobThread { pub struct JobThread {
@@ -86,52 +87,60 @@ impl JobThread {
if use_network { if use_network {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
if self.connect_to_imap(context) { let prefix = format!("{}-fetch", self.name);
info!(context, "{}-fetch started...", self.name); match self.connect_to_imap(context) {
self.imap.fetch(context); Ok(()) => {
info!(context, "{} started...", prefix);
if self.imap.should_reconnect() {
info!(context, "{}-fetch aborted, starting over...", self.name,);
self.imap.fetch(context); self.imap.fetch(context);
if self.imap.should_reconnect() {
info!(context, "{} aborted, starting over...", prefix);
self.imap.fetch(context);
}
info!(
context,
"{} done in {:.3} ms.",
prefix,
start.elapsed().as_millis(),
);
}
Err(err) => {
warn!(
context,
"{} skipped, could not connect to imap {:?}", prefix, err
);
} }
info!(
context,
"{}-fetch done in {:.3} ms.",
self.name,
start.elapsed().as_millis(),
);
} }
} }
self.state.0.lock().unwrap().using_handle = false; self.state.0.lock().unwrap().using_handle = false;
} }
fn connect_to_imap(&self, context: &Context) -> bool { fn connect_to_imap(&self, context: &Context) -> Result<(), Error> {
if async_std::task::block_on(async move { self.imap.is_connected().await }) { if async_std::task::block_on(async move { self.imap.is_connected().await }) {
return true; return Ok(());
} }
let watch_folder_name = match context.sql.get_raw_config(context, self.folder_config_name) { let watch_folder_name = match context.sql.get_raw_config(context, self.folder_config_name) {
Some(name) => name, Some(name) => name,
None => { None => {
return false; return Err(Error::WatchFolderNotFound(
self.folder_config_name.to_string(),
));
} }
}; };
let ret_connected = dc_connect_to_configured_imap(context, &self.imap) != 0; dc_connect_to_configured_imap(context, &self.imap)?;
if ret_connected { if context
if context .sql
.sql .get_raw_config_int(context, "folders_configured")
.get_raw_config_int(context, "folders_configured") .unwrap_or_default()
.unwrap_or_default() < 3
< 3 {
{ self.imap.configure_folders(context, 0x1);
self.imap.configure_folders(context, 0x1);
}
self.imap.set_watch_folder(watch_folder_name);
} }
self.imap.set_watch_folder(watch_folder_name);
ret_connected Ok(())
} }
pub fn idle(&self, context: &Context, use_network: bool) { pub fn idle(&self, context: &Context, use_network: bool) {
@@ -170,17 +179,30 @@ impl JobThread {
} }
} }
if self.connect_to_imap(context) { let poll_mode = match self.connect_to_imap(context) {
info!(context, "{}-IDLE started...", self.name,); Ok(()) => {
self.imap.idle(context); info!(context, "{}-IDLE started...", self.name,);
info!(context, "{}-IDLE ended.", self.name); let res = self.imap.idle(context);
} else { info!(context, "{}-IDLE ended.", self.name);
// It's probably wrong that the thread even runs match res {
// but let's call fake_idle and tell it to not try network at all. Ok(()) => None,
// (once we move to rust-managed threads this problem goes away) Err(Error::ImapConnectionFailed(err)) => {
info!(context, "{}-IDLE not connected, fake-idling", self.name); warn!(context, "idle connection failed: {}", err);
async_std::task::block_on(async move { self.imap.fake_idle(context, false).await }); Some(IdlePollMode::Often)
info!(context, "{}-IDLE fake-idling finished", self.name); }
Err(err) => {
warn!(context, "idle failed: {}", err);
Some(IdlePollMode::Never)
}
}
}
Err(err) => {
info!(context, "{}-IDLE fail: {:?}", self.name, err);
Some(IdlePollMode::Never)
}
};
if let Some(poll_mode) = poll_mode {
self.imap.fake_idle(context, poll_mode);
} }
self.state.0.lock().unwrap().using_handle = false; self.state.0.lock().unwrap().using_handle = false;