diff --git a/src/job.rs b/src/job.rs index 5ddd5cd54..03fad5d2b 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,3 +1,4 @@ +use std::sync::{Arc, RwLock}; use std::time::Duration; use deltachat_derive::{FromSql, ToSql}; @@ -14,6 +15,7 @@ use crate::error::Error; use crate::events::Event; use crate::imap::*; use crate::imex::*; +use crate::job_thread::JobThread; use crate::location; use crate::login_param::LoginParam; use crate::message::MsgId; @@ -31,6 +33,14 @@ enum Thread { Smtp = 5000, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum ThreadExecutor { + Inbox, + Mvbox, + Sentbox, + Smtp, +} + #[derive(Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive)] enum TryAgain { Dont, @@ -209,8 +219,9 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context) { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + fn do_DC_JOB_MOVE_MSG(&mut self, context: &Context, thread: ThreadExecutor) { + let t = get_inbox_for_executor(context, thread); + let imap_inbox = &t.read().unwrap().imap; if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if context @@ -254,8 +265,9 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context) { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + fn do_DC_JOB_DELETE_MSG_ON_IMAP(&mut self, context: &Context, thread: ThreadExecutor) { + let t = get_inbox_for_executor(context, thread); + let imap_inbox = &t.read().unwrap().imap; if let Ok(mut msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { if !msg.rfc724_mid.is_empty() { @@ -283,24 +295,26 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context) { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; - if self.foreign_id & DC_EMPTY_MVBOX > 0 { + fn do_DC_JOB_EMPTY_SERVER(&mut self, context: &Context, thread: ThreadExecutor) { + let t = get_inbox_for_executor(context, thread); + let imap_inbox = &t.read().unwrap().imap; + + if thread == ThreadExecutor::Mvbox && DC_EMPTY_MVBOX > 0 { if let Some(mvbox_folder) = context .sql .get_raw_config(context, "configured_mvbox_folder") { imap_inbox.empty_folder(context, &mvbox_folder); } - } - if self.foreign_id & DC_EMPTY_INBOX > 0 { + } else if thread == ThreadExecutor::Inbox && DC_EMPTY_INBOX > 0 { imap_inbox.empty_folder(context, "INBOX"); } } #[allow(non_snake_case)] - fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context) { - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + fn do_DC_JOB_MARKSEEN_MSG_ON_IMAP(&mut self, context: &Context, thread: ThreadExecutor) { + let t = get_inbox_for_executor(context, thread); + let imap_inbox = &t.read().unwrap().imap; if let Ok(msg) = Message::load_from_db(context, MsgId::new(self.foreign_id)) { let folder = msg.server_folder.as_ref().unwrap(); @@ -327,14 +341,17 @@ impl Job { } #[allow(non_snake_case)] - fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context) { + fn do_DC_JOB_MARKSEEN_MDN_ON_IMAP(&mut self, context: &Context, thread: ThreadExecutor) { let folder = self .param .get(Param::ServerFolder) .unwrap_or_default() .to_string(); let uid = self.param.get_int(Param::ServerUid).unwrap_or_default() as u32; - let imap_inbox = &context.inbox_thread.read().unwrap().imap; + + let t = get_inbox_for_executor(context, thread); + let imap_inbox = &t.read().unwrap().imap; + if imap_inbox.set_seen(context, &folder, uid) == ImapActionResult::RetryLater { self.try_again_later(TryAgain::StandardDelay, None); return; @@ -488,7 +505,7 @@ pub fn perform_smtp_jobs(context: &Context) { }; info!(context, "SMTP-jobs started...",); - job_perform(context, Thread::Smtp, probe_smtp_network); + job_perform(context, ThreadExecutor::Smtp, probe_smtp_network); info!(context, "SMTP-jobs ended."); { @@ -701,40 +718,62 @@ pub fn perform_inbox_jobs(context: &Context) { *context.probe_imap_network.write().unwrap() = false; *context.perform_inbox_jobs_needed.write().unwrap() = false; - job_perform(context, Thread::Imap, probe_imap_network); + job_perform(context, ThreadExecutor::Inbox, probe_imap_network); info!(context, "dc_perform_inbox_jobs ended.",); } pub fn perform_mvbox_jobs(context: &Context) { - info!(context, "dc_perform_mbox_jobs EMPTY (for now).",); + info!(context, "dc_perform_mbox_jobs starting.",); + + let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); + *context.probe_imap_network.write().unwrap() = false; + *context.perform_inbox_jobs_needed.write().unwrap() = false; + + job_perform(context, ThreadExecutor::Mvbox, probe_imap_network); + info!(context, "dc_perform_mbox_jobs ended.",); } pub fn perform_sentbox_jobs(context: &Context) { - info!(context, "dc_perform_sentbox_jobs EMPTY (for now).",); + info!(context, "dc_perform_sentbox_jobs starting.",); + + let probe_imap_network = *context.probe_imap_network.clone().read().unwrap(); + *context.probe_imap_network.write().unwrap() = false; + *context.perform_inbox_jobs_needed.write().unwrap() = false; + + job_perform(context, ThreadExecutor::Sentbox, probe_imap_network); + info!(context, "dc_perform_sentbox_jobs ended.",); } -fn job_perform(context: &Context, thread: Thread, probe_network: bool) { +fn job_perform(context: &Context, thread: ThreadExecutor, probe_network: bool) { + let folder = get_folder_for_executor(context, thread); + let query = if !probe_network { // processing for first-try and after backoff-timeouts: // process jobs in the order they were added. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp;" + "SELECT j.id, j.action, j.foreign_id, j.param, j.added_timestamp, j.desired_timestamp, j.tries \ + FROM jobs j \ + LEFT JOIN msgs m\ + WHERE j.thread=? AND j.desired_timestamp<=? AND m.server_folder=? \ + ORDER BY j.action DESC, j.added_timestamp;" } else { // processing after call to dc_maybe_network(): // process _all_ pending jobs that failed before // in the order of their backoff-times. - "SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries \ - FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC;" + "SELECT \ + j.id, j.action, j.foreign_id, j.param, j.added_timestamp, j.desired_timestamp, j.tries \ + FROM jobs j \ + LEFT JOIN msgs m \ + WHERE j.thread=? AND j.tries>0 AND m.server_folder=? \ + ORDER BY j.desired_timestamp, j.action DESC;" }; - let params_no_probe = params![thread as i64, time()]; - let params_probe = params![thread as i64]; + let params_no_probe = params![thread as i64, time(), folder]; + let params_probe = params![thread as i64, folder]; let params: &[&dyn rusqlite::ToSql] = if !probe_network { params_no_probe } else { params_probe }; - let jobs: Result, _> = context .sql .query_map( @@ -764,14 +803,7 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { for mut job in jobs.unwrap_or_default() { info!( context, - "{}-job #{}, action {} started...", - if thread == Thread::Imap { - "INBOX" - } else { - "SMTP" - }, - job.job_id, - job.action, + "{:?}-job #{}, action {} started...", thread, job.job_id, job.action, ); // some configuration jobs are "exclusive": @@ -805,11 +837,11 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { warn!(context, "Unknown job id found"); } Action::SendMsgToSmtp => job.do_DC_JOB_SEND(context), - Action::EmptyServer => job.do_DC_JOB_EMPTY_SERVER(context), - Action::DeleteMsgOnImap => job.do_DC_JOB_DELETE_MSG_ON_IMAP(context), - Action::MarkseenMsgOnImap => job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context), - Action::MarkseenMdnOnImap => job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context), - Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context), + Action::EmptyServer => job.do_DC_JOB_EMPTY_SERVER(context, thread), + Action::DeleteMsgOnImap => job.do_DC_JOB_DELETE_MSG_ON_IMAP(context, thread), + Action::MarkseenMsgOnImap => job.do_DC_JOB_MARKSEEN_MSG_ON_IMAP(context, thread), + Action::MarkseenMdnOnImap => job.do_DC_JOB_MARKSEEN_MDN_ON_IMAP(context, thread), + Action::MoveMsg => job.do_DC_JOB_MOVE_MSG(context, thread), Action::SendMdn => job.do_DC_JOB_SEND(context), Action::ConfigureImap => dc_job_do_DC_JOB_CONFIGURE_IMAP(context), Action::ImexImap => match job_do_DC_JOB_IMEX_IMAP(context, &job) { @@ -857,18 +889,14 @@ fn job_perform(context: &Context, thread: Thread, probe_network: bool) { job.update(context); info!( context, - "{}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", - if thread == Thread::Imap { - "INBOX" - } else { - "SMTP" - }, + "{:?}-job #{} not succeeded on try #{}, retry in ADD_TIME+{} (in {} seconds).", + thread, job.job_id as u32, tries, time_offset, job.added_timestamp + time_offset - time() ); - if thread == Thread::Smtp && tries < 17 - 1 { + if thread == ThreadExecutor::Smtp && tries < 17 - 1 { context .smtp_state .clone() @@ -996,12 +1024,69 @@ pub fn job_add( ).ok(); match thread { - Thread::Imap => interrupt_inbox_idle(context, false), + Thread::Imap => { + let folder: String = context + .sql + .query_get_value( + context, + "SELECT server_folder FROM msgs WHERE id=?", + params![foreign_id], + ) + .unwrap_or_default(); + + let mbox_thread = get_matching_inbox(context, &folder); + let res = mbox_thread.try_read(); + match res { + Ok(mbox_thread) => { + mbox_thread.interrupt_idle(context); + } + Err(err) => { + *context.perform_inbox_jobs_needed.write().unwrap() = true; + warn!(context, "could not interrupt idle: {}", err); + } + } + } Thread::Smtp => interrupt_smtp_idle(context), Thread::Unknown => {} } } +fn get_folder_for_executor(context: &Context, thread: ThreadExecutor) -> String { + let t = get_inbox_for_executor(context, thread); + let b = t.read().unwrap(); + b.get_watch_folder(context).unwrap_or_default().to_string() +} + +fn get_inbox_for_executor(context: &Context, thread: ThreadExecutor) -> Arc> { + match thread { + ThreadExecutor::Inbox => context.inbox_thread.clone(), + ThreadExecutor::Mvbox => context.mvbox_thread.clone(), + ThreadExecutor::Sentbox => context.sentbox_thread.clone(), + ThreadExecutor::Smtp => panic!("do not use for smtp"), + } +} + +fn get_matching_inbox(context: &Context, folder: &String) -> Arc> { + let mvbox_folder = context + .mvbox_thread + .read() + .unwrap() + .get_watch_folder(context); + let sentbox_folder = context + .sentbox_thread + .read() + .unwrap() + .get_watch_folder(context); + + if mvbox_folder.is_some() && folder == mvbox_folder.as_ref().unwrap() { + context.mvbox_thread.clone() + } else if sentbox_folder.is_some() && folder == sentbox_folder.as_ref().unwrap() { + context.sentbox_thread.clone() + } else { + context.inbox_thread.clone() + } +} + pub fn interrupt_smtp_idle(context: &Context) { info!(context, "Interrupting SMTP-idle...",); diff --git a/src/job_thread.rs b/src/job_thread.rs index c5f5aabb5..d978888d3 100644 --- a/src/job_thread.rs +++ b/src/job_thread.rs @@ -116,7 +116,7 @@ impl JobThread { } } - fn get_watch_folder(&self, context: &Context) -> Option { + pub(crate) fn get_watch_folder(&self, context: &Context) -> Option { match context.sql.get_raw_config(context, self.folder_config_name) { Some(name) => Some(name), None => {