remove context.inbox in favour of a context.inbox_thread following the mvbox_thread and sentbox_thread patterns. Also some streamlining of shutdown logic.

This commit is contained in:
holger krekel
2019-11-19 11:02:39 +01:00
parent be533fa66a
commit eae9ad6f8b
11 changed files with 201 additions and 174 deletions

View File

@@ -461,7 +461,7 @@ pub unsafe extern "C" fn dc_perform_imap_jobs(context: *mut dc_context_t) {
} }
let ffi_context = &*context; let ffi_context = &*context;
ffi_context ffi_context
.with_inner(|ctx| job::perform_imap_jobs(ctx)) .with_inner(|ctx| job::perform_inbox_jobs(ctx))
.unwrap_or(()) .unwrap_or(())
} }
@@ -473,19 +473,20 @@ pub unsafe extern "C" fn dc_perform_imap_fetch(context: *mut dc_context_t) {
} }
let ffi_context = &*context; let ffi_context = &*context;
ffi_context ffi_context
.with_inner(|ctx| job::perform_imap_fetch(ctx)) .with_inner(|ctx| job::perform_inbox_fetch(ctx))
.unwrap_or(()) .unwrap_or(())
} }
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) { pub unsafe extern "C" fn dc_perform_imap_idle(context: *mut dc_context_t) {
// TODO rename function in co-ordination with UIs
if context.is_null() { if context.is_null() {
eprintln!("ignoring careless call to dc_perform_imap_idle()"); eprintln!("ignoring careless call to dc_perform_imap_idle()");
return; return;
} }
let ffi_context = &*context; let ffi_context = &*context;
ffi_context ffi_context
.with_inner(|ctx| job::perform_imap_idle(ctx)) .with_inner(|ctx| job::perform_inbox_idle(ctx))
.unwrap_or(()) .unwrap_or(())
} }
@@ -497,7 +498,7 @@ pub unsafe extern "C" fn dc_interrupt_imap_idle(context: *mut dc_context_t) {
} }
let ffi_context = &*context; let ffi_context = &*context;
ffi_context ffi_context
.with_inner(|ctx| job::interrupt_imap_idle(ctx)) .with_inner(|ctx| job::interrupt_inbox_idle(ctx, true))
.unwrap_or(()) .unwrap_or(())
} }

View File

@@ -573,8 +573,10 @@ class IOThreads:
self._log_event("py-bindings-info", 0, "INBOX THREAD START") self._log_event("py-bindings-info", 0, "INBOX THREAD START")
while not self._thread_quitflag: while not self._thread_quitflag:
lib.dc_perform_imap_jobs(self._dc_context) lib.dc_perform_imap_jobs(self._dc_context)
lib.dc_perform_imap_fetch(self._dc_context) if not self._thread_quitflag:
lib.dc_perform_imap_idle(self._dc_context) lib.dc_perform_imap_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_idle(self._dc_context)
self._log_event("py-bindings-info", 0, "INBOX THREAD FINISHED") self._log_event("py-bindings-info", 0, "INBOX THREAD FINISHED")
def mvbox_thread_run(self): def mvbox_thread_run(self):

View File

@@ -430,15 +430,25 @@ class TestOnlineAccount:
assert self_addr not in ev[2] assert self_addr not in ev[2]
ev = ac1._evlogger.get_matching("DC_EVENT_DELETED_BLOB_FILE") ev = ac1._evlogger.get_matching("DC_EVENT_DELETED_BLOB_FILE")
def test_mvbox_sentbox_threads(self, acfactory): def test_mvbox_sentbox_threads(self, acfactory, lp):
lp.sec("ac1: start with mvbox/sentbox threads")
ac1 = acfactory.get_online_configuring_account(mvbox=True, sentbox=True) ac1 = acfactory.get_online_configuring_account(mvbox=True, sentbox=True)
lp.sec("ac2: start without mvbox/sentbox threads")
ac2 = acfactory.get_online_configuring_account() ac2 = acfactory.get_online_configuring_account()
lp.sec("ac2: waiting for configuration")
wait_configuration_progress(ac2, 1000) wait_configuration_progress(ac2, 1000)
lp.sec("ac1: waiting for configuration")
wait_configuration_progress(ac1, 1000) wait_configuration_progress(ac1, 1000)
lp.sec("ac1: send message and wait for ac2 to receive it")
chat = self.get_chat(ac1, ac2) chat = self.get_chat(ac1, ac2)
chat.send_text("message1") chat.send_text("message1")
ev = ac2._evlogger.get_matching("DC_EVENT_INCOMING_MSG|DC_EVENT_MSGS_CHANGED") ev = ac2._evlogger.get_matching("DC_EVENT_INCOMING_MSG|DC_EVENT_MSGS_CHANGED")
assert ev[2] > const.DC_CHAT_ID_LAST_SPECIAL assert ev[2] > const.DC_CHAT_ID_LAST_SPECIAL
lp.sec("test finished")
def test_move_works(self, acfactory): def test_move_works(self, acfactory):
ac1 = acfactory.get_online_configuring_account() ac1 = acfactory.get_online_configuring_account()
@@ -720,6 +730,7 @@ class TestOnlineAccount:
ac2._evlogger.set_timeout(30) ac2._evlogger.set_timeout(30)
wait_configuration_progress(ac2, 1000) wait_configuration_progress(ac2, 1000)
wait_configuration_progress(ac1, 1000) wait_configuration_progress(ac1, 1000)
lp.sec("trigger ac setup message but ignore") lp.sec("trigger ac setup message but ignore")
assert ac1.get_info()["fingerprint"] != ac2.get_info()["fingerprint"] assert ac1.get_info()["fingerprint"] != ac2.get_info()["fingerprint"]
ac1.initiate_key_transfer() ac1.initiate_key_transfer()
@@ -731,6 +742,7 @@ class TestOnlineAccount:
msg = ac2.get_message_by_id(ev[2]) msg = ac2.get_message_by_id(ev[2])
assert msg.is_setup_message() assert msg.is_setup_message()
assert msg.get_setupcodebegin() == setup_code2[:2] assert msg.get_setupcodebegin() == setup_code2[:2]
lp.sec("process second setup message") lp.sec("process second setup message")
msg.continue_key_transfer(setup_code2) msg.continue_key_transfer(setup_code2)
assert ac1.get_info()["fingerprint"] == ac2.get_info()["fingerprint"] assert ac1.get_info()["fingerprint"] == ac2.get_info()["fingerprint"]

View File

@@ -121,7 +121,7 @@ impl Context {
} }
Config::InboxWatch => { Config::InboxWatch => {
let ret = self.sql.set_raw_config(self, key, value); let ret = self.sql.set_raw_config(self, key, value);
interrupt_imap_idle(self); interrupt_inbox_idle(self, true);
ret ret
} }
Config::SentboxWatch => { Config::SentboxWatch => {

View File

@@ -65,7 +65,12 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
let mut param_autoconfig: Option<LoginParam> = None; let mut param_autoconfig: Option<LoginParam> = None;
context.inbox.read().unwrap().disconnect(context); context
.inbox_thread
.read()
.unwrap()
.imap
.disconnect(context);
context context
.sentbox_thread .sentbox_thread
.read() .read()
@@ -359,9 +364,10 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
0 0
}; };
context context
.inbox .inbox_thread
.read() .read()
.unwrap() .unwrap()
.imap
.configure_folders(context, flags); .configure_folders(context, flags);
true true
} }
@@ -401,7 +407,12 @@ pub fn dc_job_do_DC_JOB_CONFIGURE_IMAP(context: &Context) {
} }
} }
if imap_connected_here { if imap_connected_here {
context.inbox.read().unwrap().disconnect(context); context
.inbox_thread
.read()
.unwrap()
.imap
.disconnect(context);
} }
if smtp_connected_here { if smtp_connected_here {
context.smtp.clone().lock().unwrap().disconnect(); context.smtp.clone().lock().unwrap().disconnect();
@@ -497,7 +508,13 @@ fn try_imap_one_param(context: &Context, param: &LoginParam) -> Option<bool> {
param.mail_user, param.mail_server, param.mail_port, param.server_flags param.mail_user, param.mail_server, param.mail_port, param.server_flags
); );
info!(context, "Trying: {}", inf); info!(context, "Trying: {}", inf);
if context.inbox.read().unwrap().connect(context, &param) { if context
.inbox_thread
.read()
.unwrap()
.imap
.connect(context, &param)
{
info!(context, "success: {}", inf); info!(context, "success: {}", inf);
return Some(true); return Some(true);
} }

View File

@@ -44,9 +44,9 @@ pub struct Context {
/// Blob directory path /// Blob directory path
blobdir: PathBuf, blobdir: PathBuf,
pub sql: Sql, pub sql: Sql,
pub inbox: Arc<RwLock<Imap>>,
pub perform_inbox_jobs_needed: Arc<RwLock<bool>>, pub perform_inbox_jobs_needed: Arc<RwLock<bool>>,
pub probe_imap_network: Arc<RwLock<bool>>, pub probe_imap_network: Arc<RwLock<bool>>,
pub inbox_thread: Arc<RwLock<JobThread>>,
pub sentbox_thread: Arc<RwLock<JobThread>>, pub sentbox_thread: Arc<RwLock<JobThread>>,
pub mvbox_thread: Arc<RwLock<JobThread>>, pub mvbox_thread: Arc<RwLock<JobThread>>,
pub smtp: Arc<Mutex<Smtp>>, pub smtp: Arc<Mutex<Smtp>>,
@@ -121,7 +121,6 @@ impl Context {
let ctx = Context { let ctx = Context {
blobdir, blobdir,
dbfile, dbfile,
inbox: Arc::new(RwLock::new(Imap::new())),
cb, cb,
os_name: Some(os_name), os_name: Some(os_name),
running_state: Arc::new(RwLock::new(Default::default())), running_state: Arc::new(RwLock::new(Default::default())),
@@ -132,6 +131,11 @@ impl Context {
bob: Arc::new(RwLock::new(Default::default())), bob: Arc::new(RwLock::new(Default::default())),
last_smeared_timestamp: RwLock::new(0), last_smeared_timestamp: RwLock::new(0),
cmdline_sel_chat_id: Arc::new(RwLock::new(0)), cmdline_sel_chat_id: Arc::new(RwLock::new(0)),
inbox_thread: Arc::new(RwLock::new(JobThread::new(
"INBOX",
"configured_inbox_folder",
Imap::new(),
))),
sentbox_thread: Arc::new(RwLock::new(JobThread::new( sentbox_thread: Arc::new(RwLock::new(JobThread::new(
"SENTBOX", "SENTBOX",
"configured_sentbox_folder", "configured_sentbox_folder",
@@ -463,8 +467,8 @@ impl Context {
impl Drop for Context { impl Drop for Context {
fn drop(&mut self) { fn drop(&mut self) {
info!(self, "disconnecting INBOX-watch",); info!(self, "disconnecting inbox-thread",);
self.inbox.read().unwrap().disconnect(self); self.inbox_thread.read().unwrap().imap.disconnect(self);
info!(self, "disconnecting sentbox-thread",); info!(self, "disconnecting sentbox-thread",);
self.sentbox_thread.read().unwrap().imap.disconnect(self); self.sentbox_thread.read().unwrap().imap.disconnect(self);
info!(self, "disconnecting mvbox-thread",); info!(self, "disconnecting mvbox-thread",);

View File

@@ -42,8 +42,12 @@ pub enum Error {
ImapMissesIdle, ImapMissesIdle,
#[fail(display = "Imap IDLE protocol failed to init/complete")] #[fail(display = "Imap IDLE protocol failed to init/complete")]
ImapIdleProtocolFailed(String), ImapIdleProtocolFailed(String),
#[fail(display = "Imap IDLE failed to select folder {:?}", _0)]
ImapSelectFailed(String),
#[fail(display = "Connect without configured params")] #[fail(display = "Connect without configured params")]
ConnectWithoutConfigure, ConnectWithoutConfigure,
#[fail(display = "imap operation attempted while imap is torn down")]
ImapInTeardown,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -16,7 +16,7 @@ use crate::dc_receive_imf::dc_receive_imf;
use crate::error::Error; use crate::error::Error;
use crate::events::Event; use crate::events::Event;
use crate::imap_client::*; use crate::imap_client::*;
use crate::job::{connect_to_inbox, job_add, Action}; use crate::job::{job_add, Action};
use crate::login_param::{CertificateChecks, LoginParam}; use crate::login_param::{CertificateChecks, LoginParam};
use crate::message::{self, update_msg_move_state, update_server_uid}; use crate::message::{self, update_msg_move_state, update_server_uid};
use crate::oauth2::dc_get_oauth2_access_token; use crate::oauth2::dc_get_oauth2_access_token;
@@ -138,7 +138,7 @@ impl Imap {
self.should_reconnect.load(Ordering::Relaxed) self.should_reconnect.load(Ordering::Relaxed)
} }
fn trigger_reconnect(&self) { pub fn trigger_reconnect(&self) {
self.should_reconnect.store(true, Ordering::Relaxed) self.should_reconnect.store(true, Ordering::Relaxed)
} }
@@ -534,8 +534,16 @@ impl Imap {
return 0; return 0;
} }
if mailbox.uid_validity.unwrap_or_default() != uid_validity { if mailbox.uid_validity.unwrap() != uid_validity {
// first time this folder is selected or UIDVALIDITY has changed, init lastseenuid and save it to config // First time this folder is selected or UIDVALIDITY has changed.
// Init lastseenuid and save it to config.
info!(
context,
"uid_validity={} local uid_validity={} lastseenuid={}",
mailbox.uid_validity.unwrap(),
uid_validity,
last_seen_uid
);
if mailbox.exists == 0 { if mailbox.exists == 0 {
info!(context, "Folder \"{}\" is empty.", folder.as_ref()); info!(context, "Folder \"{}\" is empty.", folder.as_ref());
@@ -703,20 +711,23 @@ impl Imap {
match session.uid_fetch(set, BODY_FLAGS).await { match session.uid_fetch(set, BODY_FLAGS).await {
Ok(msgs) => msgs, Ok(msgs) => msgs,
Err(err) => { Err(err) => {
// TODO maybe differentiate between IO and input/parsing problems
// so we don't reconnect if we have a (rare) input/output parsing problem?
self.trigger_reconnect(); self.trigger_reconnect();
warn!( warn!(
context, context,
"Error on fetching message #{} from folder \"{}\"; retry={}; error={}.", "Error on fetching message #{} from folder \"{}\"; error={}.",
server_uid, server_uid,
folder.as_ref(), folder.as_ref(),
self.should_reconnect(),
err err
); );
return 0; return 0;
} }
} }
} else { } else {
return 1; // we could not get a valid imap session, this should be retried
self.trigger_reconnect();
return 0;
}; };
if msgs.is_empty() { if msgs.is_empty() {
@@ -754,11 +765,6 @@ impl Imap {
pub fn idle(&self, context: &Context) -> Result<(), Error> { pub fn idle(&self, context: &Context) -> Result<(), Error> {
task::block_on(async move { task::block_on(async move {
ensure!(
self.config.read().await.selected_folder.is_some(),
"no folder selected, probably in teardown?"
);
if !self.config.read().await.can_idle { if !self.config.read().await.can_idle {
return Err(Error::ImapMissesIdle); return Err(Error::ImapMissesIdle);
} }
@@ -766,11 +772,17 @@ impl Imap {
self.setup_handle_if_needed(context); self.setup_handle_if_needed(context);
let watch_folder = self.config.read().await.watch_folder.clone(); let watch_folder = self.config.read().await.watch_folder.clone();
if watch_folder.is_none() {
return Err(Error::ImapInTeardown);
}
match self.select_folder(context, watch_folder.as_ref()).await { match self.select_folder(context, watch_folder.as_ref()).await {
ImapActionResult::Success | ImapActionResult::AlreadyDone => {} ImapActionResult::Success | ImapActionResult::AlreadyDone => {}
ImapActionResult::Failed | ImapActionResult::RetryLater => { ImapActionResult::Failed | ImapActionResult::RetryLater => {
bail!("IMAP select failed for {:?}", watch_folder.as_ref()); return Err(Error::ImapSelectFailed(format!(
"{:?}",
watch_folder.as_ref()
)));
} }
} }
@@ -782,16 +794,16 @@ impl Imap {
// typically also need to change the Insecure branch. // typically also need to change the Insecure branch.
IdleHandle::Secure(mut handle) => { IdleHandle::Secure(mut handle) => {
if let Err(err) = handle.init().await { if let Err(err) = handle.init().await {
bail!("IDLE init failed: {}", err); return Err(Error::ImapIdleProtocolFailed(format!("{}", err)));
} }
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::Relaxed) { if self.skip_next_idle_wait.load(Ordering::SeqCst) {
// interrupt_idle has happened before we // interrupt_idle has happened before we
// provided self.interrupt // provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::Relaxed); self.skip_next_idle_wait.store(false, Ordering::SeqCst);
std::mem::drop(idle_wait); std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped"); info!(context, "Idle wait was skipped");
} else { } else {
@@ -814,16 +826,16 @@ impl Imap {
} }
IdleHandle::Insecure(mut handle) => { IdleHandle::Insecure(mut handle) => {
if let Err(err) = handle.init().await { if let Err(err) = handle.init().await {
bail!("IDLE init failed: {}", err); return Err(Error::ImapIdleProtocolFailed(format!("{}", err)));
} }
let (idle_wait, interrupt) = handle.wait_with_timeout(timeout); let (idle_wait, interrupt) = handle.wait_with_timeout(timeout);
*self.interrupt.lock().await = Some(interrupt); *self.interrupt.lock().await = Some(interrupt);
if self.skip_next_idle_wait.load(Ordering::Relaxed) { if self.skip_next_idle_wait.load(Ordering::SeqCst) {
// interrupt_idle has happened before we // interrupt_idle has happened before we
// provided self.interrupt // provided self.interrupt
self.skip_next_idle_wait.store(false, Ordering::Relaxed); self.skip_next_idle_wait.store(false, Ordering::SeqCst);
std::mem::drop(idle_wait); std::mem::drop(idle_wait);
info!(context, "Idle wait was skipped"); info!(context, "Idle wait was skipped");
} else { } else {
@@ -918,12 +930,18 @@ impl Imap {
pub fn interrupt_idle(&self) { pub fn interrupt_idle(&self) {
task::block_on(async move { task::block_on(async move {
if self.interrupt.lock().await.take().is_none() { let mut interrupt: Option<stop_token::StopSource> = self.interrupt.lock().await.take();
if interrupt.is_none() {
// idle wait is not running, signal it needs to skip // idle wait is not running, signal it needs to skip
self.skip_next_idle_wait.store(true, Ordering::Relaxed); self.skip_next_idle_wait.store(true, Ordering::SeqCst);
// meanwhile idle-wait may have produced the interrupter // meanwhile idle-wait may have produced the StopSource
let _ = self.interrupt.lock().await.take(); interrupt = self.interrupt.lock().await.take();
}
// let's manually drop the StopSource
if interrupt.is_some() {
eprintln!("low-level: dropping stop-source to interrupt idle");
std::mem::drop(interrupt)
} }
}); });
} }
@@ -1055,7 +1073,12 @@ impl Imap {
if uid == 0 { if uid == 0 {
return Some(ImapActionResult::Failed); return Some(ImapActionResult::Failed);
} else if !self.is_connected().await { } else if !self.is_connected().await {
if let Err(err) = connect_to_inbox(context, &self) { // currently jobs are only performed on the INBOX thread
// TODO: make INBOX/SENT/MVBOX perform the jobs on their
// respective folders to avoid select_folder network traffic
// and the involved error states
let inbox_thread = context.inbox_thread.read().unwrap();
if let Err(err) = inbox_thread.connect_to_imap(context) {
warn!(context, "prepare_imap_op failed: {}", err); warn!(context, "prepare_imap_op failed: {}", err);
return Some(ImapActionResult::RetryLater); return Some(ImapActionResult::RetryLater);
} }
@@ -1239,10 +1262,9 @@ impl Imap {
session.subscribe(mvbox).await.expect("failed to subscribe"); session.subscribe(mvbox).await.expect("failed to subscribe");
} }
} }
context context
.sql .sql
.set_raw_config_int(context, "folders_configured", 3) .set_raw_config(context, "configured_inbox_folder", Some("INBOX"))
.ok(); .ok();
if let Some(ref mvbox_folder) = mvbox_folder { if let Some(ref mvbox_folder) = mvbox_folder {
context context
@@ -1260,6 +1282,10 @@ impl Imap {
) )
.ok(); .ok();
} }
context
.sql
.set_raw_config_int(context, "folders_configured", 3)
.ok();
} }
}) })
} }

View File

@@ -51,7 +51,7 @@ pub enum ImexMode {
/// Import/export things. /// Import/export things.
/// For this purpose, the function creates a job that is executed in the IMAP-thread then; /// For this purpose, the function creates a job that is executed in the IMAP-thread then;
/// this requires to call dc_perform_imap_jobs() regularly. /// this requires to call dc_perform_inbox_jobs() regularly.
/// ///
/// What to do is defined by the _what_ parameter. /// What to do is defined by the _what_ parameter.
/// ///

View File

@@ -203,7 +203,7 @@ impl Job {
#[allow(non_snake_case)] #[allow(non_snake_case)]
fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap(); let imap_inbox = &context.inbox_thread.read().unwrap().imap;
if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
if context if context
@@ -212,7 +212,7 @@ impl Job {
.unwrap_or_default() .unwrap_or_default()
< 3 < 3
{ {
inbox.configure_folders(context, 0x1i32); imap_inbox.configure_folders(context, 0x1i32);
} }
let dest_folder = context let dest_folder = context
.sql .sql
@@ -222,7 +222,7 @@ impl Job {
let server_folder = msg.server_folder.as_ref().unwrap(); let server_folder = msg.server_folder.as_ref().unwrap();
let mut dest_uid = 0; let mut dest_uid = 0;
match inbox.mv( match imap_inbox.mv(
context, context,
server_folder, server_folder,
msg.server_uid, msg.server_uid,
@@ -248,7 +248,7 @@ impl Job {
#[allow(non_snake_case)] #[allow(non_snake_case)]
fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap(); let imap_inbox = &context.inbox_thread.read().unwrap().imap;
if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
if !msg.rfc724_mid.is_empty() { if !msg.rfc724_mid.is_empty() {
@@ -263,7 +263,8 @@ impl Job {
we delete the message from the server */ we delete the message from the server */
let mid = msg.rfc724_mid; let mid = msg.rfc724_mid;
let server_folder = msg.server_folder.as_ref().unwrap(); let server_folder = msg.server_folder.as_ref().unwrap();
let res = inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid); let res =
imap_inbox.delete_msg(context, &mid, server_folder, &mut msg.server_uid);
if res == ImapActionResult::RetryLater { if res == ImapActionResult::RetryLater {
self.try_again_later(-1i32, None); self.try_again_later(-1i32, None);
return; return;
@@ -276,27 +277,27 @@ impl Job {
#[allow(non_snake_case)] #[allow(non_snake_case)]
fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) { fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap(); let imap_inbox = &context.inbox_thread.read().unwrap().imap;
if self.foreign_id & DC_EMPTY_MVBOX > 0 { if self.foreign_id & DC_EMPTY_MVBOX > 0 {
if let Some(mvbox_folder) = context if let Some(mvbox_folder) = context
.sql .sql
.get_raw_config(context, "configured_mvbox_folder") .get_raw_config(context, "configured_mvbox_folder")
{ {
inbox.empty_folder(context, &mvbox_folder); imap_inbox.empty_folder(context, &mvbox_folder);
} }
} }
if self.foreign_id & DC_EMPTY_INBOX > 0 { if self.foreign_id & DC_EMPTY_INBOX > 0 {
inbox.empty_folder(context, "INBOX"); imap_inbox.empty_folder(context, "INBOX");
} }
} }
#[allow(non_snake_case)] #[allow(non_snake_case)]
fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) {
let inbox = context.inbox.read().unwrap(); let imap_inbox = &context.inbox_thread.read().unwrap().imap;
if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) {
let folder = msg.server_folder.as_ref().unwrap(); let folder = msg.server_folder.as_ref().unwrap();
match inbox.set_seen(context, folder, msg.server_uid) { match imap_inbox.set_seen(context, folder, msg.server_uid) {
ImapActionResult::RetryLater => { ImapActionResult::RetryLater => {
self.try_again_later(3i32, None); self.try_again_later(3i32, None);
} }
@@ -326,8 +327,8 @@ impl Job {
.unwrap_or_default() .unwrap_or_default()
.to_string(); .to_string();
let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32;
let inbox = context.inbox.read().unwrap(); let imap_inbox = &context.inbox_thread.read().unwrap().imap;
if inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater {
self.try_again_later(3i32, None); self.try_again_later(3i32, None);
return; return;
} }
@@ -338,7 +339,7 @@ impl Job {
.unwrap_or_default() .unwrap_or_default()
< 3 < 3
{ {
inbox.configure_folders(context, 0x1i32); imap_inbox.configure_folders(context, 0x1i32);
} }
let dest_folder = context let dest_folder = context
.sql .sql
@@ -346,7 +347,7 @@ impl Job {
if let Some(dest_folder) = dest_folder { if let Some(dest_folder) = dest_folder {
let mut dest_uid = 0; let mut dest_uid = 0;
if ImapActionResult::RetryLater if ImapActionResult::RetryLater
== inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid) == imap_inbox.mv(context, &folder, uid, &dest_folder, &mut dest_uid)
{ {
self.try_again_later(3, None); self.try_again_later(3, None);
} }
@@ -366,81 +367,14 @@ pub fn job_kill_action(context: &Context, action: Action) -> bool {
.is_ok() .is_ok()
} }
pub fn perform_imap_fetch(context: &Context) { pub fn perform_inbox_fetch(context: &Context) {
if !context.get_config_bool(Config::InboxWatch) { let use_network = context.get_config_bool(Config::InboxWatch);
info!(context, "INBOX-fetch skipped: INBOX-watch is disabled.");
return;
}
let inbox = context.inbox.read().unwrap();
let start = std::time::Instant::now();
if let Err(err) = connect_to_inbox(context, &inbox) { context
warn!(context, "could not connect to inbox: {:?}", err); .inbox_thread
return; .write()
} .unwrap()
info!(context, "INBOX-fetch started...",); .fetch(context, use_network);
inbox.fetch(context);
if inbox.should_reconnect() {
info!(context, "INBOX-fetch aborted, starting over...",);
inbox.fetch(context);
}
info!(
context,
"INBOX-fetch done in {:.4} ms.",
start.elapsed().as_nanos() as f64 / 1_000_000.0,
);
}
pub fn perform_imap_idle(context: &Context) {
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!(
context,
"INBOX-IDLE will not be started because of waiting jobs."
);
return;
}
let inbox = context.inbox.read().unwrap();
let poll_mode = if !context.get_config_bool(Config::InboxWatch) {
Some(IdlePollMode::Never)
} else {
match connect_to_inbox(context, &inbox) {
Err(Error::ImapConnectionFailed(param)) => {
warn!(context, "perform_imap_idle could not connect {:?}", param);
Some(IdlePollMode::Often)
}
Err(err) => {
warn!(context, "perform_imap_idle error: {}", err);
// anything else than a plain connection error
// hints at configuration issues.
Some(IdlePollMode::Never)
}
Ok(()) => {
info!(context, "INBOX-IDLE starting...");
let res = inbox.idle(context);
info!(context, "INBOX-IDLE ended.");
match res {
Ok(()) => None,
Err(Error::ImapConnectionFailed(param)) => {
warn!(
context,
"perform_imap_idle IDLE could not connect {:?}", param
);
Some(IdlePollMode::Often)
}
Err(err) => {
warn!(context, "perform_imap_idle IDLE error: {}", err);
Some(IdlePollMode::Never)
}
}
}
}
};
if let Some(poll_mode) = poll_mode {
inbox.fake_idle(context, poll_mode);
}
} }
pub fn perform_mvbox_fetch(context: &Context) { pub fn perform_mvbox_fetch(context: &Context) {
@@ -453,20 +387,6 @@ pub fn perform_mvbox_fetch(context: &Context) {
.fetch(context, use_network); .fetch(context, use_network);
} }
pub fn perform_mvbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
context
.mvbox_thread
.read()
.unwrap()
.idle(context, use_network);
}
pub fn interrupt_mvbox_idle(context: &Context) {
context.mvbox_thread.read().unwrap().interrupt_idle(context);
}
pub fn perform_sentbox_fetch(context: &Context) { pub fn perform_sentbox_fetch(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch); let use_network = context.get_config_bool(Config::SentboxWatch);
@@ -477,6 +397,33 @@ pub fn perform_sentbox_fetch(context: &Context) {
.fetch(context, use_network); .fetch(context, use_network);
} }
pub fn perform_inbox_idle(context: &Context) {
if *context.perform_inbox_jobs_needed.clone().read().unwrap() {
info!(
context,
"INBOX-IDLE will not be started because of waiting jobs."
);
return;
}
let use_network = context.get_config_bool(Config::InboxWatch);
context
.inbox_thread
.read()
.unwrap()
.idle(context, use_network);
}
pub fn perform_mvbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::MvboxWatch);
context
.mvbox_thread
.read()
.unwrap()
.idle(context, use_network);
}
pub fn perform_sentbox_idle(context: &Context) { pub fn perform_sentbox_idle(context: &Context) {
let use_network = context.get_config_bool(Config::SentboxWatch); let use_network = context.get_config_bool(Config::SentboxWatch);
@@ -487,6 +434,27 @@ pub fn perform_sentbox_idle(context: &Context) {
.idle(context, use_network); .idle(context, use_network);
} }
pub fn interrupt_inbox_idle(context: &Context, block: bool) {
info!(context, "interrupt_inbox_idle called blocking={}", block);
if block {
context.inbox_thread.read().unwrap().interrupt_idle(context);
} else {
match context.inbox_thread.try_read() {
Ok(inbox_thread) => {
inbox_thread.interrupt_idle(context);
}
Err(err) => {
*context.perform_inbox_jobs_needed.write().unwrap() = true;
warn!(context, "could not interrupt idle: {}", err);
}
}
}
}
pub fn interrupt_mvbox_idle(context: &Context) {
context.mvbox_thread.read().unwrap().interrupt_idle(context);
}
pub fn interrupt_sentbox_idle(context: &Context) { pub fn interrupt_sentbox_idle(context: &Context) {
context context
.sentbox_thread .sentbox_thread
@@ -587,7 +555,7 @@ pub fn maybe_network(context: &Context) {
} }
interrupt_smtp_idle(context); interrupt_smtp_idle(context);
interrupt_imap_idle(context); interrupt_inbox_idle(context, true);
interrupt_mvbox_idle(context); interrupt_mvbox_idle(context);
interrupt_sentbox_idle(context); interrupt_sentbox_idle(context);
} }
@@ -719,15 +687,15 @@ pub fn job_send_msg(context: &Context, msg_id: MsgId) -> Result<(), Error> {
Ok(()) Ok(())
} }
pub fn perform_imap_jobs(context: &Context) { pub fn perform_inbox_jobs(context: &Context) {
info!(context, "dc_perform_imap_jobs starting.",); info!(context, "dc_perform_inbox_jobs starting.",);
let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); let probe_imap_network = *context.probe_imap_network.clone().read().unwrap();
*context.probe_imap_network.write().unwrap() = false; *context.probe_imap_network.write().unwrap() = false;
*context.perform_inbox_jobs_needed.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false;
job_perform(context, Thread::Imap, probe_imap_network); job_perform(context, Thread::Imap, probe_imap_network);
info!(context, "dc_perform_imap_jobs ended.",); info!(context, "dc_perform_inbox_jobs ended.",);
} }
pub fn perform_mvbox_jobs(context: &Context) { pub fn perform_mvbox_jobs(context: &Context) {
@@ -963,12 +931,6 @@ fn suspend_smtp_thread(context: &Context, suspend: bool) {
} }
} }
pub fn connect_to_inbox(context: &Context, imap: &Imap) -> Result<(), Error> {
dc_connect_to_configured_imap(context, imap)?;
imap.set_watch_folder("INBOX".into());
Ok(())
}
fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> { fn send_mdn(context: &Context, msg_id: MsgId) -> Result<(), Error> {
let mut mimefactory = MimeFactory::load_mdn(context, msg_id)?; let mut mimefactory = MimeFactory::load_mdn(context, msg_id)?;
unsafe { mimefactory.render()? }; unsafe { mimefactory.render()? };
@@ -1039,7 +1001,7 @@ pub fn job_add(
).ok(); ).ok();
match thread { match thread {
Thread::Imap => interrupt_imap_idle(context), Thread::Imap => interrupt_inbox_idle(context, false),
Thread::Smtp => interrupt_smtp_idle(context), Thread::Smtp => interrupt_smtp_idle(context),
Thread::Unknown => {} Thread::Unknown => {}
} }
@@ -1055,11 +1017,3 @@ pub fn interrupt_smtp_idle(context: &Context) {
state.idle = true; state.idle = true;
cvar.notify_one(); cvar.notify_one();
} }
pub fn interrupt_imap_idle(context: &Context) {
info!(context, "Interrupting INBOX-IDLE...",);
*context.perform_inbox_jobs_needed.write().unwrap() = true;
context.inbox.read().unwrap().interrupt_idle();
}

View File

@@ -2,7 +2,7 @@ use std::sync::{Arc, Condvar, Mutex};
use crate::configure::*; use crate::configure::*;
use crate::context::Context; use crate::context::Context;
use crate::error::Error; use crate::error::{Error, Result};
use crate::imap::{IdlePollMode, Imap}; use crate::imap::{IdlePollMode, Imap};
#[derive(Debug)] #[derive(Debug)]
@@ -86,10 +86,10 @@ impl JobThread {
} }
if use_network { if use_network {
let start = std::time::Instant::now();
let prefix = format!("{}-fetch", self.name); let prefix = format!("{}-fetch", self.name);
match self.connect_to_imap(context) { match self.connect_to_imap(context) {
Ok(()) => { Ok(()) => {
let start = std::time::Instant::now();
info!(context, "{} started...", prefix); info!(context, "{} started...", prefix);
self.imap.fetch(context); self.imap.fetch(context);
@@ -116,7 +116,7 @@ impl JobThread {
self.state.0.lock().unwrap().using_handle = false; self.state.0.lock().unwrap().using_handle = false;
} }
fn connect_to_imap(&self, context: &Context) -> Result<(), Error> { pub fn connect_to_imap(&self, context: &Context) -> Result<()> {
if async_std::task::block_on(async move { self.imap.is_connected().await }) { if async_std::task::block_on(async move { self.imap.is_connected().await }) {
return Ok(()); return Ok(());
} }
@@ -179,26 +179,33 @@ impl JobThread {
} }
} }
let prefix = format!("{}-IDLE", self.name);
let poll_mode = match self.connect_to_imap(context) { let poll_mode = match self.connect_to_imap(context) {
Ok(()) => { Ok(()) => {
info!(context, "{}-IDLE started...", self.name,); info!(context, "{} started...", prefix);
let res = self.imap.idle(context); let res = self.imap.idle(context);
info!(context, "{}-IDLE ended.", self.name); info!(context, "{} ended...", prefix);
match res { match res {
Ok(()) => None, Ok(()) => None,
Err(Error::ImapConnectionFailed(err)) => { Err(Error::ImapConnectionFailed(err))
warn!(context, "idle connection failed: {}", err); | Err(Error::ImapIdleProtocolFailed(err)) => {
self.imap.trigger_reconnect();
warn!(context, "{} failed: {}, reconnecting", prefix, err);
Some(IdlePollMode::Often) Some(IdlePollMode::Often)
} }
Err(Error::ImapInTeardown) => {
warn!(context, "{} aborting as imap is in teardown", prefix);
None
}
Err(err) => { Err(err) => {
warn!(context, "idle failed: {}", err); warn!(context, "{} failed fundamentally: {}", prefix, err);
Some(IdlePollMode::Never) Some(IdlePollMode::Never)
} }
} }
} }
Err(err) => { Err(err) => {
info!(context, "{}-IDLE fail: {:?}", self.name, err); info!(context, "{}-IDLE connection fail: {:?}", self.name, err);
Some(IdlePollMode::Never) Some(IdlePollMode::Often)
} }
}; };
if let Some(poll_mode) = poll_mode { if let Some(poll_mode) = poll_mode {