diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index c5362761a..2391ce6d9 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -461,7 +461,7 @@ pub unsafe extern "C" fn dc_perform_imap_jobs(context: *mut dc_context_t) { } let ffi_context = &*context; ffi_context - .with_inner(|ctx| job::perform_imap_jobs(ctx)) + .with_inner(|ctx| job::perform_inbox_jobs(ctx)) .unwrap_or(()) } @@ -473,19 +473,20 @@ pub unsafe extern "C" fn dc_perform_imap_fetch(context: *mut dc_context_t) { } let ffi_context = &*context; ffi_context - .with_inner(|ctx| job::perform_imap_fetch(ctx)) + .with_inner(|ctx| job::perform_inbox_fetch(ctx)) .unwrap_or(()) } #[no_mangle] pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) { + // TODO rename function in co-ordination with UIs if context.is_null() { eprintln!("ignoring careless call to dc_perform_imap_idle()"); return; } let ffi_context = &*context; ffi_context - .with_inner(|ctx| job::perform_imap_idle(ctx)) + .with_inner(|ctx| job::perform_inbox_idle(ctx)) .unwrap_or(()) } @@ -497,7 +498,7 @@ pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) { } let ffi_context = &*context; ffi_context - .with_inner(|ctx| job::interrupt_imap_idle(ctx)) + .with_inner(|ctx| job::interrupt_inbox_idle(ctx, true)) .unwrap_or(()) } diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 38135b196..29c1f5c66 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -573,8 +573,10 @@ class IOThreads: self._log_event("py-bindings-info", 0, "INBOX THREAD START") while not self._thread_quitflag: lib.dc_perform_imap_jobs(self._dc_context) - lib.dc_perform_imap_fetch(self._dc_context) - lib.dc_perform_imap_idle(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_imap_fetch(self._dc_context) + if not self._thread_quitflag: + lib.dc_perform_imap_idle(self._dc_context) self._log_event("py-bindings-info", 0, "INBOX THREAD FINISHED") def mvbox_thread_run(self): diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 7608292ae..d9070f54d 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -430,15 +430,25 @@ class TestOnlineAccount: assert self_addr not in ev[2] ev = ac1._evlogger.get_matching("DC_EVENT_DELETED_BLOB_FILE") - def test_mvbox_sentbox_threads(self, acfactory): + def test_mvbox_sentbox_threads(self, acfactory, lp): + lp.sec("ac1: start with mvbox/sentbox threads") ac1 = acfactory.get_online_configuring_account(mvbox=True, sentbox=True) + + lp.sec("ac2: start without mvbox/sentbox threads") ac2 = acfactory.get_online_configuring_account() + + lp.sec("ac2: waiting for configuration") wait_configuration_progress(ac2, 1000) + + lp.sec("ac1: waiting for configuration") wait_configuration_progress(ac1, 1000) + + lp.sec("ac1: send message and wait for ac2 to receive it") chat = self.get_chat(ac1, ac2) chat.send_text("message1") ev = ac2._evlogger.get_matching("DC_EVENT_INCOMING_MSG|DC_EVENT_MSGS_CHANGED") assert ev[2] > const.DC_CHAT_ID_LAST_SPECIAL + lp.sec("test finished") def test_move_works(self, acfactory): ac1 = acfactory.get_online_configuring_account() @@ -720,6 +730,7 @@ class TestOnlineAccount: ac2._evlogger.set_timeout(30) wait_configuration_progress(ac2, 1000) wait_configuration_progress(ac1, 1000) + lp.sec("trigger ac setup message but ignore") assert ac1.get_info()["fingerprint"] != ac2.get_info()["fingerprint"] ac1.initiate_key_transfer() @@ -731,6 +742,7 @@ class TestOnlineAccount: msg = ac2.get_message_by_id(ev[2]) assert msg.is_setup_message() assert msg.get_setupcodebegin() == setup_code2[:2] + lp.sec("process second setup message") msg.continue_key_transfer(setup_code2) assert ac1.get_info()["fingerprint"] == ac2.get_info()["fingerprint"] diff --git a/src/config.rs b/src/config.rs index ceaac5318..985f52d96 100644 --- a/src/config.rs +++ b/src/config.rs @@ -121,7 +121,7 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value); - interrupt_imap_idle(self); + interrupt_inbox_idle(self, true); ret } Config::SentboxWatch => { diff --git a/src/configure/mod.rs b/src/configure/mod.rs index d8a5ad728..47f95d99a 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -65,7 +65,12 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { let mut param_autoconfig: Option = None; - context.inbox.read().unwrap().disconnect(context); + context + .inbox_thread + .read() + .unwrap() + .imap + .disconnect(context); context .sentbox_thread .read() @@ -359,9 +364,10 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { 0 }; context - .inbox + .inbox_thread .read() .unwrap() + .imap .configure_folders(context, flags); true } @@ -401,7 +407,12 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) { } } if imap_connected_here { - context.inbox.read().unwrap().disconnect(context); + context + .inbox_thread + .read() + .unwrap() + .imap + .disconnect(context); } if smtp_connected_here { context.smtp.clone().lock().unwrap().disconnect(); @@ -497,7 +508,13 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option { param.mail_user, param.mail_server, param.mail_port, param.server_flags ); info!(context, "Trying: {}", inf); - if context.inbox.read().unwrap().connect(context, ¶m) { + if context + .inbox_thread + .read() + .unwrap() + .imap + .connect(context, ¶m) + { info!(context, "success: {}", inf); return Some(true); } diff --git a/src/context.rs b/src/context.rs index 7322aecf3..963e1e98e 100644 --- a/src/context.rs +++ b/src/context.rs @@ -44,9 +44,9 @@ pub struct Context { /// Blob directory path blobdir: PathBuf, pub sql: Sql, - pub inbox: Arc>, pub perform_inbox_jobs_needed: Arc>, pub probe_imap_network: Arc>, + pub inbox_thread: Arc>, pub sentbox_thread: Arc>, pub mvbox_thread: Arc>, pub smtp: Arc>, @@ -121,7 +121,6 @@ impl Context { let ctx = Context { blobdir, dbfile, - inbox: Arc::new(RwLock::new(Imap::new())), cb, os_name: Some(os_name), running_state: Arc::new(RwLock::new(Default::default())), @@ -132,6 +131,11 @@ impl Context { bob: Arc::new(RwLock::new(Default::default())), last_smeared_timestamp: RwLock::new(0), cmdline_sel_chat_id: Arc::new(RwLock::new(0)), + inbox_thread: Arc::new(RwLock::new(JobThread::new( + "INBOX", + "configured_inbox_folder", + Imap::new(), + ))), sentbox_thread: Arc::new(RwLock::new(JobThread::new( "SENTBOX", "configured_sentbox_folder", @@ -463,8 +467,8 @@ impl Context { impl Drop for Context { fn drop(&mut self) { - info!(self, "disconnecting INBOX-watch",); - self.inbox.read().unwrap().disconnect(self); + info!(self, "disconnecting inbox-thread",); + self.inbox_thread.read().unwrap().imap.disconnect(self); info!(self, "disconnecting sentbox-thread",); self.sentbox_thread.read().unwrap().imap.disconnect(self); info!(self, "disconnecting mvbox-thread",); diff --git a/src/error.rs b/src/error.rs index 5497251b1..74c4053b9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -42,8 +42,12 @@ pub enum Error { ImapMissesIdle, #[fail(display = "Imap IDLE protocol failed to init/complete")] ImapIdleProtocolFailed(String), + #[fail(display = "Imap IDLE failed to select folder {:?}", _0)] + ImapSelectFailed(String), #[fail(display = "Connect without configured params")] ConnectWithoutConfigure, + #[fail(display = "imap operation attempted while imap is torn down")] + ImapInTeardown, } pub type Result = std::result::Result; diff --git a/src/imap.rs b/src/imap.rs index 5ded6d751..bc78d24fd 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -16,7 +16,7 @@ use crate::dc_receive_imf::dc_receive_imf; use crate::error::Error; use crate::events::Event; use crate::imap_client::*; -use crate::job::{connect_to_inbox, job_add, Action}; +use crate::job::{job_add, Action}; use crate::login_param::{CertificateChecks, LoginParam}; use crate::message::{self, update_msg_move_state, update_server_uid}; use crate::oauth2::dc_get_oauth2_access_token; @@ -138,7 +138,7 @@ impl Imap { self.should_reconnect.load(Ordering::Relaxed) } - fn trigger_reconnect(&self) { + pub fn trigger_reconnect(&self) { self.should_reconnect.store(true, Ordering::Relaxed) } @@ -534,8 +534,16 @@ impl Imap { return 0; } - if mailbox.uid_validity.unwrap_or_default() != uid_validity { - // first time this folder is selected or UIDVALIDITY has changed, init lastseenuid and save it to config + if mailbox.uid_validity.unwrap() != uid_validity { + // First time this folder is selected or UIDVALIDITY has changed. + // Init lastseenuid and save it to config. + info!( + context, + "uid_validity={} local uid_validity={} lastseenuid={}", + mailbox.uid_validity.unwrap(), + uid_validity, + last_seen_uid + ); if mailbox.exists == 0 { info!(context, "Folder \"{}\" is empty.", folder.as_ref()); @@ -703,20 +711,23 @@ impl Imap { match session.uid_fetch(set, BODY_FLAGS).await { Ok(msgs) => msgs, Err(err) => { + // TODO maybe differentiate between IO and input/parsing problems + // so we don't reconnect if we have a (rare) input/output parsing problem? self.trigger_reconnect(); warn!( context, - "Error on fetching message #{} from folder \"{}\"; retry={}; error={}.", + "Error on fetching message #{} from folder \"{}\"; error={}.", server_uid, folder.as_ref(), - self.should_reconnect(), err ); return 0; } } } else { - return 1; + // we could not get a valid imap session, this should be retried + self.trigger_reconnect(); + return 0; }; if msgs.is_empty() { @@ -754,11 +765,6 @@ impl Imap { pub fn idle(&self, context: &Context) -> Result<(), Error> { task::block_on(async move { - ensure!( - self.config.read().await.selected_folder.is_some(), - "no folder selected, probably in teardown?" - ); - if !self.config.read().await.can_idle { return Err(Error::ImapMissesIdle); } @@ -766,11 +772,17 @@ impl Imap { self.setup_handle_if_needed(context); let watch_folder = self.config.read().await.watch_folder.clone(); + if watch_folder.is_none() { + return Err(Error::ImapInTeardown); + } match self.select_folder(context, watch_folder.as_ref()).await { ImapActionResult::Success | ImapActionResult::AlreadyDone => {} ImapActionResult::Failed | ImapActionResult::RetryLater => { - bail!("IMAP select failed for {:?}", watch_folder.as_ref()); + return Err(Error::ImapSelectFailed(format!( + "{:?}", + watch_folder.as_ref() + ))); } } @@ -782,16 +794,16 @@ impl Imap { // typically also need to change the Insecure branch. IdleHandle::Secure(mut handle) => { if let Err(err) = handle.init().await { - bail!("IDLE init failed: {}", err); + return Err(Error::ImapIdleProtocolFailed(format!("{}", err))); } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::Relaxed) { + if self.skip_next_idle_wait.load(Ordering::SeqCst) { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::Relaxed); + self.skip_next_idle_wait.store(false, Ordering::SeqCst); std::mem::drop(idle_wait); info!(context, "Idle wait was skipped"); } else { @@ -814,16 +826,16 @@ impl Imap { } IdleHandle::Insecure(mut handle) => { if let Err(err) = handle.init().await { - bail!("IDLE init failed: {}", err); + return Err(Error::ImapIdleProtocolFailed(format!("{}", err))); } let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); *self.interrupt.lock().await = Some(interrupt); - if self.skip_next_idle_wait.load(Ordering::Relaxed) { + if self.skip_next_idle_wait.load(Ordering::SeqCst) { // interrupt_idle has happened before we // provided self.interrupt - self.skip_next_idle_wait.store(false, Ordering::Relaxed); + self.skip_next_idle_wait.store(false, Ordering::SeqCst); std::mem::drop(idle_wait); info!(context, "Idle wait was skipped"); } else { @@ -918,12 +930,18 @@ impl Imap { pub fn interrupt_idle(&self) { task::block_on(async move { - if self.interrupt.lock().await.take().is_none() { + let mut interrupt: Option = self.interrupt.lock().await.take(); + if interrupt.is_none() { // idle wait is not running, signal it needs to skip - self.skip_next_idle_wait.store(true, Ordering::Relaxed); + self.skip_next_idle_wait.store(true, Ordering::SeqCst); - // meanwhile idle-wait may have produced the interrupter - let _ = self.interrupt.lock().await.take(); + // meanwhile idle-wait may have produced the StopSource + interrupt = self.interrupt.lock().await.take(); + } + // let's manually drop the StopSource + if interrupt.is_some() { + eprintln!("low-level: dropping stop-source to interrupt idle"); + std::mem::drop(interrupt) } }); } @@ -1055,7 +1073,12 @@ impl Imap { if uid == 0 { return Some(ImapActionResult::Failed); } else if !self.is_connected().await { - if let Err(err) = connect_to_inbox(context, &self) { + // currently jobs are only performed on the INBOX thread + // TODO: make INBOX/SENT/MVBOX perform the jobs on their + // respective folders to avoid select_folder network traffic + // and the involved error states + let inbox_thread = context.inbox_thread.read().unwrap(); + if let Err(err) = inbox_thread.connect_to_imap(context) { warn!(context, "prepare_imap_op failed: {}", err); return Some(ImapActionResult::RetryLater); } @@ -1239,10 +1262,9 @@ impl Imap { session.subscribe(mvbox).await.expect("failed to subscribe"); } } - context .sql - .set_raw_config_int(context, "folders_configured", 3) + .set_raw_config(context, "configured_inbox_folder", Some("INBOX")) .ok(); if let Some(ref mvbox_folder) = mvbox_folder { context @@ -1260,6 +1282,10 @@ impl Imap { ) .ok(); } + context + .sql + .set_raw_config_int(context, "folders_configured", 3) + .ok(); } }) } diff --git a/src/imex.rs b/src/imex.rs index 756fe15b2..b71369b3b 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -51,7 +51,7 @@ pub enum ImexMode { /// Import/export things. /// For this purpose, the function creates a job that is executed in the IMAP-thread then; -/// this requires to call dc_perform_imap_jobs() regularly. +/// this requires to call dc_perform_inbox_jobs() regularly. /// /// What to do is defined by the _what_ parameter. /// diff --git a/src/job.rs b/src/job.rs index fbb275215..207478640 100644 --- a/src/job.rs +++ b/src/job.rs @@ -203,7 +203,7 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let imap_inbox = &context.inbox_thread.read().unwrap().imap; if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if context @@ -212,7 +212,7 @@ impl Job { .unwrap_or_default() < 3 { - inbox.configure_folders(context, 0x1i32); + imap_inbox.configure_folders(context, 0x1i32); } let dest_folder = context .sql @@ -222,7 +222,7 @@ impl Job { let server_folder = msg.server_folder.as_ref().unwrap(); let mut dest_uid = 0; - match inbox.mv( + match imap_inbox.mv( context, server_folder, msg.server_uid, @@ -248,7 +248,7 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let imap_inbox = &context.inbox_thread.read().unwrap().imap; if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if !msg.rfc724_mid.is_empty() { @@ -263,7 +263,8 @@ impl Job { we delete the message from the server */ let mid = msg.rfc724_mid; let server_folder = msg.server_folder.as_ref().unwrap(); - let res = inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid); + let res = + imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid); if res == ImapActionResult::RetryLater { self.try_again_later(-1i32, None); return; @@ -276,27 +277,27 @@ impl Job { #[allow(non_snake_case)] fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let imap_inbox = &context.inbox_thread.read().unwrap().imap; if self.foreign_id & DC_EMPTY_MVBOX > 0 { if let Some(mvbox_folder) = context .sql .get_raw_config(context, "configured_mvbox_folder") { - inbox.empty_folder(context, &mvbox_folder); + imap_inbox.empty_folder(context, &mvbox_folder); } } if self.foreign_id & DC_EMPTY_INBOX > 0 { - inbox.empty_folder(context, "INBOX"); + imap_inbox.empty_folder(context, "INBOX"); } } #[allow(non_snake_case)] fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { - let inbox = context.inbox.read().unwrap(); + let imap_inbox = &context.inbox_thread.read().unwrap().imap; if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { let folder = msg.server_folder.as_ref().unwrap(); - match inbox.set_seen(context, folder, msg.server_uid) { + match imap_inbox.set_seen(context, folder, msg.server_uid) { ImapActionResult::RetryLater => { self.try_again_later(3i32, None); } @@ -326,8 +327,8 @@ impl Job { .unwrap_or_default() .to_string(); let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let inbox = context.inbox.read().unwrap(); - if inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { + let imap_inbox = &context.inbox_thread.read().unwrap().imap; + if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { self.try_again_later(3i32, None); return; } @@ -338,7 +339,7 @@ impl Job { .unwrap_or_default() < 3 { - inbox.configure_folders(context, 0x1i32); + imap_inbox.configure_folders(context, 0x1i32); } let dest_folder = context .sql @@ -346,7 +347,7 @@ impl Job { if let Some(dest_folder) = dest_folder { let mut dest_uid = 0; if ImapActionResult::RetryLater - == inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid) + == imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid) { self.try_again_later(3, None); } @@ -366,81 +367,14 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool { .is_ok() } -pub fn perform_imap_fetch(context: &Context) { - if !context.get_config_bool(Config::InboxWatch) { - info!(context, "INBOX-fetch skipped: INBOX-watch is disabled."); - return; - } - let inbox = context.inbox.read().unwrap(); - let start = std::time::Instant::now(); +pub fn perform_inbox_fetch(context: &Context) { + let use_network = context.get_config_bool(Config::InboxWatch); - if let Err(err) = connect_to_inbox(context, &inbox) { - warn!(context, "could not connect to inbox: {:?}", err); - return; - } - info!(context, "INBOX-fetch started...",); - inbox.fetch(context); - if inbox.should_reconnect() { - info!(context, "INBOX-fetch aborted, starting over...",); - inbox.fetch(context); - } - info!( - context, - "INBOX-fetch done in {:.4} ms.", - start.elapsed().as_nanos() as f64 / 1_000_000.0, - ); -} - -pub fn perform_imap_idle(context: &Context) { - if *context.perform_inbox_jobs_needed.clone().read().unwrap() { - info!( - context, - "INBOX-IDLE will not be started because of waiting jobs." - ); - return; - } - let inbox = context.inbox.read().unwrap(); - let poll_mode = if !context.get_config_bool(Config::InboxWatch) { - Some(IdlePollMode::Never) - } else { - match connect_to_inbox(context, &inbox) { - Err(Error::ImapConnectionFailed(param)) => { - warn!(context, "perform_imap_idle could not connect {:?}", param); - Some(IdlePollMode::Often) - } - Err(err) => { - warn!(context, "perform_imap_idle error: {}", err); - // anything else than a plain connection error - // hints at configuration issues. - Some(IdlePollMode::Never) - } - - Ok(()) => { - info!(context, "INBOX-IDLE starting..."); - let res = inbox.idle(context); - info!(context, "INBOX-IDLE ended."); - - match res { - Ok(()) => None, - Err(Error::ImapConnectionFailed(param)) => { - warn!( - context, - "perform_imap_idle IDLE could not connect {:?}", param - ); - Some(IdlePollMode::Often) - } - Err(err) => { - warn!(context, "perform_imap_idle IDLE error: {}", err); - Some(IdlePollMode::Never) - } - } - } - } - }; - - if let Some(poll_mode) = poll_mode { - inbox.fake_idle(context, poll_mode); - } + context + .inbox_thread + .write() + .unwrap() + .fetch(context, use_network); } pub fn perform_mvbox_fetch(context: &Context) { @@ -453,20 +387,6 @@ pub fn perform_mvbox_fetch(context: &Context) { .fetch(context, use_network); } -pub fn perform_mvbox_idle(context: &Context) { - let use_network = context.get_config_bool(Config::MvboxWatch); - - context - .mvbox_thread - .read() - .unwrap() - .idle(context, use_network); -} - -pub fn interrupt_mvbox_idle(context: &Context) { - context.mvbox_thread.read().unwrap().interrupt_idle(context); -} - pub fn perform_sentbox_fetch(context: &Context) { let use_network = context.get_config_bool(Config::SentboxWatch); @@ -477,6 +397,33 @@ pub fn perform_sentbox_fetch(context: &Context) { .fetch(context, use_network); } +pub fn perform_inbox_idle(context: &Context) { + if *context.perform_inbox_jobs_needed.clone().read().unwrap() { + info!( + context, + "INBOX-IDLE will not be started because of waiting jobs." + ); + return; + } + let use_network = context.get_config_bool(Config::InboxWatch); + + context + .inbox_thread + .read() + .unwrap() + .idle(context, use_network); +} + +pub fn perform_mvbox_idle(context: &Context) { + let use_network = context.get_config_bool(Config::MvboxWatch); + + context + .mvbox_thread + .read() + .unwrap() + .idle(context, use_network); +} + pub fn perform_sentbox_idle(context: &Context) { let use_network = context.get_config_bool(Config::SentboxWatch); @@ -487,6 +434,27 @@ pub fn perform_sentbox_idle(context: &Context) { .idle(context, use_network); } +pub fn interrupt_inbox_idle(context: &Context, block: bool) { + info!(context, "interrupt_inbox_idle called blocking={}", block); + if block { + context.inbox_thread.read().unwrap().interrupt_idle(context); + } else { + match context.inbox_thread.try_read() { + Ok(inbox_thread) => { + inbox_thread.interrupt_idle(context); + } + Err(err) => { + *context.perform_inbox_jobs_needed.write().unwrap() = true; + warn!(context, "could not interrupt idle: {}", err); + } + } + } +} + +pub fn interrupt_mvbox_idle(context: &Context) { + context.mvbox_thread.read().unwrap().interrupt_idle(context); +} + pub fn interrupt_sentbox_idle(context: &Context) { context .sentbox_thread @@ -587,7 +555,7 @@ pub fn maybe_network(context: &Context) { } interrupt_smtp_idle(context); - interrupt_imap_idle(context); + interrupt_inbox_idle(context, true); interrupt_mvbox_idle(context); interrupt_sentbox_idle(context); } @@ -719,15 +687,15 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<(), Error> { Ok(()) } -pub fn perform_imap_jobs(context: &Context) { - info!(context, "dc_perform_imap_jobs starting.",); +pub fn perform_inbox_jobs(context: &Context) { + info!(context, "dc_perform_inbox_jobs starting.",); let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); *context.probe_imap_network.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false; job_perform(context, Thread::Imap, probe_imap_network); - info!(context, "dc_perform_imap_jobs ended.",); + info!(context, "dc_perform_inbox_jobs ended.",); } pub fn perform_mvbox_jobs(context: &Context) { @@ -963,12 +931,6 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) { } } -pub fn connect_to_inbox(context: &Context, imap: &Imap) -> Result<(), Error> { - dc_connect_to_configured_imap(context, imap)?; - imap.set_watch_folder("INBOX".into()); - Ok(()) -} - fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> { let mut mimefactory = MimeFactory::load_mdn(context, msg_id)?; unsafe { mimefactory.render()? }; @@ -1039,7 +1001,7 @@ pub fn job_add( ).ok(); match thread { - Thread::Imap => interrupt_imap_idle(context), + Thread::Imap => interrupt_inbox_idle(context, false), Thread::Smtp => interrupt_smtp_idle(context), Thread::Unknown => {} } @@ -1055,11 +1017,3 @@ pub fn interrupt_smtp_idle(context: &Context) { state.idle = true; cvar.notify_one(); } - -pub fn interrupt_imap_idle(context: &Context) { - info!(context, "Interrupting INBOX-IDLE...",); - - *context.perform_inbox_jobs_needed.write().unwrap() = true; - - context.inbox.read().unwrap().interrupt_idle(); -} diff --git a/src/job_thread.rs b/src/job_thread.rs index 67522936a..981aa3e8b 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Condvar, Mutex}; use crate::configure::*; use crate::context::Context; -use crate::error::Error; +use crate::error::{Error, Result}; use crate::imap::{IdlePollMode, Imap}; #[derive(Debug)] @@ -86,10 +86,10 @@ impl JobThread { } if use_network { - let start = std::time::Instant::now(); let prefix = format!("{}-fetch", self.name); match self.connect_to_imap(context) { Ok(()) => { + let start = std::time::Instant::now(); info!(context, "{} started...", prefix); self.imap.fetch(context); @@ -116,7 +116,7 @@ impl JobThread { self.state.0.lock().unwrap().using_handle = false; } - fn connect_to_imap(&self, context: &Context) -> Result<(), Error> { + pub fn connect_to_imap(&self, context: &Context) -> Result<()> { if async_std::task::block_on(async move { self.imap.is_connected().await }) { return Ok(()); } @@ -179,26 +179,33 @@ impl JobThread { } } + let prefix = format!("{}-IDLE", self.name); let poll_mode = match self.connect_to_imap(context) { Ok(()) => { - info!(context, "{}-IDLE started...", self.name,); + info!(context, "{} started...", prefix); let res = self.imap.idle(context); - info!(context, "{}-IDLE ended.", self.name); + info!(context, "{} ended...", prefix); match res { Ok(()) => None, - Err(Error::ImapConnectionFailed(err)) => { - warn!(context, "idle connection failed: {}", err); + Err(Error::ImapConnectionFailed(err)) + | Err(Error::ImapIdleProtocolFailed(err)) => { + self.imap.trigger_reconnect(); + warn!(context, "{} failed: {}, reconnecting", prefix, err); Some(IdlePollMode::Often) } + Err(Error::ImapInTeardown) => { + warn!(context, "{} aborting as imap is in teardown", prefix); + None + } Err(err) => { - warn!(context, "idle failed: {}", err); + warn!(context, "{} failed fundamentally: {}", prefix, err); Some(IdlePollMode::Never) } } } Err(err) => { - info!(context, "{}-IDLE fail: {:?}", self.name, err); - Some(IdlePollMode::Never) + info!(context, "{}-IDLE connection fail: {:?}", self.name, err); + Some(IdlePollMode::Often) } }; if let Some(poll_mode) = poll_mode {