From 05e1c00cd1d9a2c4a4729fe8b2d27a6764c84208 Mon Sep 17 00:00:00 2001 From: Hocuri Date: Fri, 5 Jun 2020 16:27:22 +0200 Subject: [PATCH] fix: update message ids correctly Fixes #1495 --- python/src/deltachat/account.py | 3 +- python/src/deltachat/direct_imap.py | 141 ++++++++++++++++++++++++++++ python/src/deltachat/testplugin.py | 21 ++++- python/tests/test_account.py | 79 ++++++++++++++++ src/config.rs | 13 ++- src/configure/mod.rs | 4 +- src/context.rs | 34 ++----- src/imap/idle.rs | 36 +++---- src/imap/mod.rs | 23 ++--- src/job.rs | 83 ++++++++++------ src/scheduler.rs | 134 +++++++++++--------------- 11 files changed, 401 insertions(+), 170 deletions(-) create mode 100644 python/src/deltachat/direct_imap.py diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 34f2e63ba..bf399c2de 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -28,7 +28,7 @@ class Account(object): """ MissingCredentials = MissingCredentials - def __init__(self, db_path, os_name=None, logging=True): + def __init__(self, db_path, os_name=None, logging=True, logid=None): """ initialize account object. :param db_path: a path to the account database. The database @@ -38,6 +38,7 @@ class Account(object): # initialize per-account plugin system self._pm = hookspec.PerAccount._make_plugin_manager() self._logging = logging + self.logid = logid self.add_account_plugin(self) diff --git a/python/src/deltachat/direct_imap.py b/python/src/deltachat/direct_imap.py new file mode 100644 index 000000000..8247b08cb --- /dev/null +++ b/python/src/deltachat/direct_imap.py @@ -0,0 +1,141 @@ +import imaplib +import pathlib +from . import Account + +INBOX = "Inbox" +SENT = "Sent" +MVBOX = "DeltaChat" +MVBOX_FALLBBACK = "INBOX/DeltaChat" +DC_CONSTANT_MSG_MOVESTATE_PENDING = 1 +DC_CONSTANT_MSG_MOVESTATE_STAY = 2 +DC_CONSTANT_MSG_MOVESTATE_MOVING = 3 + + +def db_folder_attr(name): + def fget(s): + return s.db_folder.get(name, 1) + + def fset(s, val): + s.db_folder[name] = val + return property(fget, fset, None, None) + + +class ImapConn(): + def __init__(self, foldername, conn_info): + self.foldername = foldername + host, user, pw = conn_info + + self.connection = imaplib.IMAP4_SSL(host) + self.connection.login(user, pw) + messages = self.reselect_folder() + try: + self.original_msg_count = int(messages[0]) + except IndexError: + self.original_msg_count = 0 + + def mark_all_read(self): + self.reselect_folder() +# result, data = self.connection.uid('search', None, "(UNSEEN)") + result, data = self.connection.search(None, 'UnSeen') + try: + mails_uid = data[0].split() + print("New mails") + +# self.connection.store(data[0].replace(' ',','),'+FLAGS','\Seen') + for e_id in mails_uid: + self.connection.store(e_id, '+FLAGS', '\\Seen') + print("marked:", e_id) + + return True + except IndexError: + print("No unread") + return False + + def get_unread_cnt(self): + self.reselect_folder() +# result, data = self.connection.uid('search', None, "(UNSEEN)") + result, data = self.connection.search(None, 'UnSeen') + try: + mails_uid = data[0].split() + + return len(mails_uid) + except IndexError: + return 0 + + def get_new_email_cnt(self): + messages = self.reselect_folder() + try: + return int(messages[0]) - self.original_msg_count + except IndexError: + return 0 + + def reselect_folder(self): + status, messages = self.connection.select(self.foldername) + if status != "OK": + print("Incorrect mail box " + status + str(messages)) + raise ConnectionError +# print("(Re-)Selected mailbox: " + status + " " + str(messages)) + return messages + + def __del__(self): + try: + self.connection.close() + except Exception: + pass + try: + self.connection.logout() + except Exception: + print("Could not logout direct_imap conn") + + +def make_direct_imap(account, folder): + conn_info = (account.get_config("configured_mail_server"), + account.get_config("addr"), account.get_config("mail_pw")) + # try: + # return ImapConn(folder, conn_info=conn_info) + # except ConnectionError as e: + # if folder == MVBOX: + # account.log("Selecting " + MVBOX_FALLBBACK + " not " + MVBOX + " because connecting to the latter failed") + # return ImapConn(MVBOX_FALLBBACK, conn_info=conn_info) + # else: + # raise e + if folder == MVBOX: + new_folder = account.get_config("configured_mvbox_folder") + else: + new_folder = folder + if new_folder != folder: + account.log("Making connection with " + new_folder + " not " + folder) + return ImapConn(new_folder, conn_info=conn_info) + + +def print_imap_structure(database, dir="."): + print_imap_structure_ac(Account(database), dir) + + +def print_imap_structure_ac(ac, dir="."): + acinfo = ac.logid + "-" + ac.get_config("addr") + print("================= ACCOUNT", acinfo, "=================") + print("----------------- CONFIG: -----------------") + print(ac.get_info()) + + for imapfolder in [INBOX, MVBOX, SENT, MVBOX_FALLBBACK]: + try: + imap = make_direct_imap(ac, imapfolder) + c = imap.connection + typ, data = c.search(None, 'ALL') + c._get_tagged_response + print("-----------------", imapfolder, "-----------------") + for num in data[0].split(): + typ, data = c.fetch(num, '(RFC822)') + body = data[0][1] + + typ, data = c.fetch(num, '(UID FLAGS)') + info = data[0] + + path = pathlib.Path(dir).joinpath("IMAP-MESSAGES", acinfo, imapfolder) + path.mkdir(parents=True, exist_ok=True) + file = path.joinpath(str(info).replace("b'", "").replace("'", "").replace("\\", "")) + file.write_bytes(body) + print("Message", info, "saved as", file) + except Exception: + pass diff --git a/python/src/deltachat/testplugin.py b/python/src/deltachat/testplugin.py index 8a1939b08..334db8cb5 100644 --- a/python/src/deltachat/testplugin.py +++ b/python/src/deltachat/testplugin.py @@ -12,7 +12,7 @@ import tempfile import pytest import requests -from . import Account, const +from . import Account, const, direct_imap from .capi import lib from .events import FFIEventLogger, FFIEventTracker from _pytest._code import Source @@ -228,7 +228,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): acc.disable_logging() def make_account(self, path, logid, quiet=False): - ac = Account(path, logging=self._logging) + ac = Account(path, logging=self._logging, logid=logid) ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac)) ac.addr = ac.get_self_contact().addr if not quiet: @@ -377,7 +377,10 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): am = AccountMaker() request.addfinalizer(am.finalize) - return am + yield am + if request.node.rep_call.failed: + for ac in am._accounts: + direct_imap.print_imap_structure_ac(ac, tmpdir) class BotProcess: @@ -446,3 +449,15 @@ def lp(): def step(self, msg): print("-" * 5, "step " + msg, "-" * 5) return Printer() + + +@pytest.hookimpl(tryfirst=True, hookwrapper=True) +def pytest_runtest_makereport(item, call): + # execute all other hooks to obtain the report object + outcome = yield + rep = outcome.get_result() + + # set a report attribute for each phase of a call, which can + # be "setup", "call", "teardown" + + setattr(item, "rep_" + rep.when, rep) diff --git a/python/tests/test_account.py b/python/tests/test_account.py index f0f935465..f8603f897 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -7,6 +7,8 @@ from deltachat import const, Account from deltachat.message import Message from deltachat.hookspec import account_hookimpl from datetime import datetime, timedelta +from deltachat import direct_imap +from deltachat.direct_imap import make_direct_imap @pytest.mark.parametrize("msgtext,res", [ @@ -635,6 +637,83 @@ class TestOnlineAccount: ev_msg = ac1_clone._evtracker.wait_next_messages_changed() assert ev_msg.text == msg_out.text + @pytest.mark.parametrize('i', range(30)) + def test_mark_read_on_server(self, acfactory, lp, i): + ac1 = acfactory.get_online_configuring_account() + ac2 = acfactory.get_online_configuring_account(mvbox=True, move=True) + + ac1.wait_configure_finish() + ac1.start_io() + ac2.wait_configure_finish() + ac2.start_io() + + imap2 = make_direct_imap(ac2, direct_imap.MVBOX) + # imap2.mark_all_read() + assert imap2.get_unread_cnt() == 0 + + chat = self.get_chat(ac1, ac2) + chat_on_ac2 = self.get_chat(ac2, ac1) + + chat.send_text("Text message") + + incoming_on_ac2 = ac2._evtracker.wait_next_incoming_message() + lp.sec("Incoming: "+incoming_on_ac2.text) + + assert list(ac2.get_fresh_messages()) + + for i in range(0, 20): + if imap2.get_unread_cnt() == 1: + break + time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core + assert imap2.get_unread_cnt() == 1 + + chat_on_ac2.mark_noticed() + incoming_on_ac2.mark_seen() + ac2._evtracker.wait_next_messages_changed() + + assert not list(ac2.get_fresh_messages()) + + # The new messages should be seen now. + for i in range(0, 20): + if imap2.get_unread_cnt() == 0: + break + time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core + assert imap2.get_unread_cnt() == 0 + + @pytest.mark.parametrize('i', range(30)) + def test_mark_bcc_read_on_server(self, acfactory, lp, i): + ac1 = acfactory.get_online_configuring_account(mvbox=True, move=True) + ac2 = acfactory.get_online_configuring_account() + + ac1.wait_configure_finish() + ac1.start_io() + ac2.wait_configure_finish() + ac2.start_io() + + imap1 = make_direct_imap(ac1, direct_imap.MVBOX) + imap1.mark_all_read() + assert imap1.get_unread_cnt() == 0 + + chat = self.get_chat(ac1, ac2) + + ac1.set_config("bcc_self", "1") + chat.send_text("Text message") + + ac1._evtracker.get_matching("DC_EVENT_SMTP_MESSAGE_SENT") + + for i in range(0, 20): + if imap1.get_new_email_cnt() == 1: + break + time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core + assert imap1.get_new_email_cnt() == 1 + + for i in range(0, 20): + if imap1.get_unread_cnt() == 0: + break + time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core + + assert imap1.get_unread_cnt() == 0 + def test_send_file_twice_unicode_filename_mangling(self, tmpdir, acfactory, lp): ac1, ac2 = acfactory.get_two_online_accounts() chat = self.get_chat(ac1, ac2) diff --git a/src/config.rs b/src/config.rs index 55c465155..644093a97 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,7 +11,7 @@ use crate::dc_tools::*; use crate::events::Event; use crate::message::MsgId; use crate::mimefactory::RECOMMENDED_FILE_SIZE; -use crate::stock::StockMessage; +use crate::{scheduler::InterruptInfo, stock::StockMessage}; /// The available configuration keys. #[derive( @@ -104,6 +104,9 @@ pub enum Config { ConfiguredServerFlags, ConfiguredSendSecurity, ConfiguredE2EEEnabled, + ConfiguredInboxFolder, + ConfiguredMvboxFolder, + ConfiguredSentboxFolder, Configured, #[strum(serialize = "sys.version")] @@ -137,6 +140,7 @@ impl Context { // Default values match key { Config::Selfstatus => Some(self.stock_str(StockMessage::StatusLine).await.into_owned()), + Config::ConfiguredInboxFolder => Some("INBOX".to_owned()), _ => key.get_str("default").map(|s| s.to_string()), } } @@ -199,17 +203,18 @@ impl Context { } Config::InboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_inbox(false).await; + self.interrupt_inbox(InterruptInfo::new(false, None)).await; ret } Config::SentboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_sentbox(false).await; + self.interrupt_sentbox(InterruptInfo::new(false, None)) + .await; ret } Config::MvboxWatch => { let ret = self.sql.set_raw_config(self, key, value).await; - self.interrupt_mvbox(false).await; + self.interrupt_mvbox(InterruptInfo::new(false, None)).await; ret } Config::Selfstatus => { diff --git a/src/configure/mod.rs b/src/configure/mod.rs index 1519739fe..b15a1a3d1 100644 --- a/src/configure/mod.rs +++ b/src/configure/mod.rs @@ -272,9 +272,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> { let create_mvbox = ctx.get_config_bool(Config::MvboxWatch).await || ctx.get_config_bool(Config::MvboxMove).await; - imap.configure_folders(ctx, create_mvbox) - .await - .context("configuring folders failed")?; + imap.configure_folders(ctx, create_mvbox).await?; imap.select_with_uidvalidity(ctx, "INBOX") .await diff --git a/src/context.rs b/src/context.rs index de431703f..f50343936 100644 --- a/src/context.rs +++ b/src/context.rs @@ -300,13 +300,11 @@ impl Context { .unwrap_or_default(); let configured_sentbox_folder = self - .sql - .get_raw_config(self, "configured_sentbox_folder") + .get_config(Config::ConfiguredSentboxFolder) .await .unwrap_or_else(|| "".to_string()); let configured_mvbox_folder = self - .sql - .get_raw_config(self, "configured_mvbox_folder") + .get_config(Config::ConfiguredMvboxFolder) .await .unwrap_or_else(|| "".to_string()); @@ -442,33 +440,19 @@ impl Context { .unwrap_or_default() } - pub fn is_inbox(&self, folder_name: impl AsRef) -> bool { - folder_name.as_ref() == "INBOX" + pub async fn is_inbox(&self, folder_name: impl AsRef) -> bool { + self.get_config(Config::ConfiguredInboxFolder).await + == Some(folder_name.as_ref().to_string()) } pub async fn is_sentbox(&self, folder_name: impl AsRef) -> bool { - let sentbox_name = self - .sql - .get_raw_config(self, "configured_sentbox_folder") - .await; - if let Some(name) = sentbox_name { - name == folder_name.as_ref() - } else { - false - } + self.get_config(Config::ConfiguredSentboxFolder).await + == Some(folder_name.as_ref().to_string()) } pub async fn is_mvbox(&self, folder_name: impl AsRef) -> bool { - let mvbox_name = self - .sql - .get_raw_config(self, "configured_mvbox_folder") - .await; - - if let Some(name) = mvbox_name { - name == folder_name.as_ref() - } else { - false - } + self.get_config(Config::ConfiguredMvboxFolder).await + == Some(folder_name.as_ref().to_string()) } pub async fn do_heuristics_moves(&self, folder: &str, msg_id: MsgId) { diff --git a/src/imap/idle.rs b/src/imap/idle.rs index e6b3ea621..a029f74e3 100644 --- a/src/imap/idle.rs +++ b/src/imap/idle.rs @@ -4,7 +4,7 @@ use async_imap::extensions::idle::IdleResponse; use async_std::prelude::*; use std::time::{Duration, SystemTime}; -use crate::context::Context; +use crate::{context::Context, scheduler::InterruptInfo}; use super::select_folder; use super::session::Session; @@ -34,7 +34,11 @@ impl Imap { self.config.can_idle } - pub async fn idle(&mut self, context: &Context, watch_folder: Option) -> Result { + pub async fn idle( + &mut self, + context: &Context, + watch_folder: Option, + ) -> Result { use futures::future::FutureExt; if !self.can_idle() { @@ -46,7 +50,7 @@ impl Imap { let session = self.session.take(); let timeout = Duration::from_secs(23 * 60); - let mut probe_network = false; + let mut info = Default::default(); if let Some(session) = session { let mut handle = session.idle(); @@ -58,7 +62,7 @@ impl Imap { enum Event { IdleResponse(IdleResponse), - Interrupt(bool), + Interrupt(InterruptInfo), } if self.skip_next_idle_wait { @@ -90,8 +94,8 @@ impl Imap { Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => { info!(context, "Idle wait was interrupted"); } - Ok(Event::Interrupt(probe)) => { - probe_network = probe; + Ok(Event::Interrupt(i)) => { + info = i; info!(context, "Idle wait was interrupted"); } Err(err) => { @@ -125,14 +129,14 @@ impl Imap { } } - Ok(probe_network) + Ok(info) } pub(crate) async fn fake_idle( &mut self, context: &Context, watch_folder: Option, - ) -> bool { + ) -> InterruptInfo { // Idle using polling. This is also needed if we're not yet configured - // in this case, we're waiting for a configure job (and an interrupt). @@ -144,7 +148,7 @@ impl Imap { return self.idle_interrupt.recv().await.unwrap_or_default(); } - let mut probe_network = false; + let mut info: InterruptInfo = Default::default(); if self.skip_next_idle_wait { // interrupt_idle has happened before we // provided self.interrupt @@ -157,10 +161,10 @@ impl Imap { enum Event { Tick, - Interrupt(bool), + Interrupt(InterruptInfo), } // loop until we are interrupted or if we fetched something - probe_network = + info = loop { use futures::future::FutureExt; match interval @@ -181,7 +185,7 @@ impl Imap { } if self.config.can_idle { // we only fake-idled because network was gone during IDLE, probably - break false; + break InterruptInfo::new(false, None); } info!(context, "fake_idle is connected"); // we are connected, let's see if fetching messages results @@ -194,7 +198,7 @@ impl Imap { Ok(res) => { info!(context, "fetch_new_messages returned {:?}", res); if res { - break false; + break InterruptInfo::new(false, None); } } Err(err) => { @@ -204,9 +208,9 @@ impl Imap { } } } - Event::Interrupt(probe_network) => { + Event::Interrupt(info) => { // Interrupt - break probe_network; + break info; } } }; @@ -222,6 +226,6 @@ impl Imap { / 1000., ); - probe_network + info } } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 372b78309..3bf9a706a 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -27,7 +27,7 @@ use crate::message::{self, update_server_uid}; use crate::mimeparser; use crate::oauth2::dc_get_oauth2_access_token; use crate::param::Params; -use crate::stock::StockMessage; +use crate::{scheduler::InterruptInfo, stock::StockMessage}; mod client; mod idle; @@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*"; #[derive(Debug)] pub struct Imap { - idle_interrupt: Receiver, + idle_interrupt: Receiver, config: ImapConfig, session: Option, connected: bool, @@ -181,7 +181,7 @@ impl Default for ImapConfig { } impl Imap { - pub fn new(idle_interrupt: Receiver) -> Self { + pub fn new(idle_interrupt: Receiver) -> Self { Imap { idle_interrupt, config: Default::default(), @@ -974,7 +974,7 @@ impl Imap { uid: u32, ) -> Option { if uid == 0 { - return Some(ImapActionResult::Failed); + return Some(ImapActionResult::RetryLater); } if !self.is_connected() { // currently jobs are only performed on the INBOX thread @@ -1223,19 +1223,16 @@ impl Imap { } } context - .sql - .set_raw_config(context, "configured_inbox_folder", Some("INBOX")) + .set_config(Config::ConfiguredInboxFolder, Some("INBOX")) .await?; if let Some(ref mvbox_folder) = mvbox_folder { context - .sql - .set_raw_config(context, "configured_mvbox_folder", Some(mvbox_folder)) + .set_config(Config::ConfiguredMvboxFolder, Some(mvbox_folder)) .await?; } if let Some(ref sentbox_folder) = sentbox_folder { context - .sql - .set_raw_config(context, "configured_sentbox_folder", Some(sentbox_folder)) + .set_config(Config::ConfiguredSentboxFolder, Some(sentbox_folder)) .await?; } context @@ -1393,7 +1390,11 @@ async fn precheck_imf( } if old_server_folder != server_folder || old_server_uid != server_uid { - update_server_uid(context, &rfc724_mid, server_folder, server_uid).await; + update_server_uid(context, rfc724_mid, server_folder, server_uid).await; + context + .interrupt_inbox(InterruptInfo::new(false, Some(msg_id))) + .await; + info!(context, "Updating server_uid and interrupting") } Ok(true) } else { diff --git a/src/job.rs b/src/job.rs index 7249cb0e5..7cffba035 100644 --- a/src/job.rs +++ b/src/job.rs @@ -31,7 +31,7 @@ use crate::message::{self, Message, MessageState}; use crate::mimefactory::MimeFactory; use crate::param::*; use crate::smtp::Smtp; -use crate::sql; +use crate::{scheduler::InterruptInfo, sql}; // results in ~3 weeks for the last backoff timespan const JOB_RETRIES: u32 = 17; @@ -504,10 +504,7 @@ impl Job { warn!(context, "could not configure folders: {:?}", err); return Status::RetryLater; } - let dest_folder = context - .sql - .get_raw_config(context, "configured_mvbox_folder") - .await; + let dest_folder = context.get_config(Config::ConfiguredMvboxFolder).await; if let Some(dest_folder) = dest_folder { let server_folder = msg.server_folder.as_ref().unwrap(); @@ -518,7 +515,7 @@ impl Job { { ImapActionResult::RetryLater => Status::RetryLater, ImapActionResult::Success => { - // XXX Rust-Imap provides no target uid on mv, so just set it to 0 + // Rust-Imap provides no target uid on mv, so just set it to 0, update again when precheck_imf() is called for the moved message message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, 0).await; Status::Finished(Ok(())) } @@ -612,11 +609,7 @@ impl Job { async fn empty_server(&mut self, context: &Context, imap: &mut Imap) -> Status { if self.foreign_id & DC_EMPTY_MVBOX > 0 { - if let Some(mvbox_folder) = context - .sql - .get_raw_config(context, "configured_mvbox_folder") - .await - { + if let Some(mvbox_folder) = &context.get_config(Config::ConfiguredMvboxFolder).await { imap.empty_folder(context, &mvbox_folder).await; } } @@ -1029,14 +1022,18 @@ pub async fn add(context: &Context, job: Job) { | Action::MarkseenMsgOnImap | Action::MoveMsg => { info!(context, "interrupt: imap"); - context.interrupt_inbox(false).await; + context + .interrupt_inbox(InterruptInfo::new(false, None)) + .await; } Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn | Action::SendMsgToSmtp => { info!(context, "interrupt: smtp"); - context.interrupt_smtp(false).await; + context + .interrupt_smtp(InterruptInfo::new(false, None)) + .await; } } } @@ -1051,38 +1048,49 @@ pub async fn add(context: &Context, job: Job) { pub(crate) async fn load_next( context: &Context, thread: Thread, - probe_network: bool, + info: &InterruptInfo, ) -> Option { info!(context, "loading job for {}-thread", thread); - let query = if !probe_network { + + let query; + let params; + let t = time(); + let m; + let thread_i = thread as i64; + + if let Some(msg_id) = info.msg_id { + query = r#" +SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries +FROM jobs +WHERE thread=? AND foreign_id=? +ORDER BY action DESC, added_timestamp +LIMIT 1; +"#; + m = msg_id; + params = paramsv![thread_i, m]; + } else if !info.probe_network { // processing for first-try and after backoff-timeouts: // process jobs in the order they were added. - r#" + query = r#" SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries FROM jobs WHERE thread=? AND desired_timestamp<=? ORDER BY action DESC, added_timestamp LIMIT 1; -"# +"#; + params = paramsv![thread_i, t]; } else { // processing after call to dc_maybe_network(): // process _all_ pending jobs that failed before // in the order of their backoff-times. - r#" + query = r#" SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries FROM jobs WHERE thread=? AND tries>0 ORDER BY desired_timestamp, action DESC LIMIT 1; -"# - }; - - let thread_i = thread as i64; - let t = time(); - let params = if !probe_network { - paramsv![thread_i, t] - } else { - paramsv![thread_i] +"#; + params = paramsv![thread_i]; }; let job = loop { @@ -1189,11 +1197,21 @@ mod tests { // all jobs. let t = dummy_context().await; insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct. - let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next( + &t.ctx, + Thread::from(Action::MoveMsg), + &InterruptInfo::new(false, None), + ) + .await; assert!(jobs.is_none()); insert_job(&t.ctx, 1).await; - let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next( + &t.ctx, + Thread::from(Action::MoveMsg), + &InterruptInfo::new(false, None), + ) + .await; assert!(jobs.is_some()); } @@ -1203,7 +1221,12 @@ mod tests { insert_job(&t.ctx, 1).await; - let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await; + let jobs = load_next( + &t.ctx, + Thread::from(Action::MoveMsg), + &InterruptInfo::new(false, None), + ) + .await; assert!(jobs.is_some()); } } diff --git a/src/scheduler.rs b/src/scheduler.rs index ea45197a2..fd685f9e7 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,7 +5,7 @@ use async_std::task; use crate::context::Context; use crate::imap::Imap; use crate::job::{self, Thread}; -use crate::smtp::Smtp; +use crate::{config::Config, message::MsgId, smtp::Smtp}; pub(crate) struct StopToken; @@ -32,36 +32,20 @@ impl Context { self.scheduler.read().await.maybe_network().await; } - pub(crate) async fn interrupt_inbox(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_inbox(probe_network) - .await; + pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_inbox(info).await; } - pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_sentbox(probe_network) - .await; + pub(crate) async fn interrupt_sentbox(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_sentbox(info).await; } - pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_mvbox(probe_network) - .await; + pub(crate) async fn interrupt_mvbox(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_mvbox(info).await; } - pub(crate) async fn interrupt_smtp(&self, probe_network: bool) { - self.scheduler - .read() - .await - .interrupt_smtp(probe_network) - .await; + pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) { + self.scheduler.read().await.interrupt_smtp(info).await; } } @@ -86,14 +70,14 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne started.send(()).await; // track number of continously executed jobs - let mut jobs_loaded = 0; - let mut probe_network = false; + let mut jobs_loaded: i32 = 0; + let mut info: InterruptInfo = Default::default(); loop { - match job::load_next(&ctx, Thread::Imap, probe_network).await { + match job::load_next(&ctx, Thread::Imap, &info).await { Some(job) if jobs_loaded <= 20 => { jobs_loaded += 1; job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await; - probe_network = false; + info = Default::default(); } Some(job) => { // Let the fetch run, but return back to the job afterwards. @@ -103,8 +87,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne } None => { jobs_loaded = 0; - probe_network = - fetch_idle(&ctx, &mut connection, "configured_inbox_folder").await; + info = fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await; } } } @@ -121,7 +104,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne } async fn fetch(ctx: &Context, connection: &mut Imap) { - match get_watch_folder(&ctx, "configured_inbox_folder").await { + match ctx.get_config(Config::ConfiguredInboxFolder).await { Some(watch_folder) => { // fetch if let Err(err) = connection.fetch(&ctx, &watch_folder).await { @@ -136,8 +119,8 @@ async fn fetch(ctx: &Context, connection: &mut Imap) { } } -async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool { - match get_watch_folder(&ctx, folder).await { +async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo { + match ctx.get_config(folder).await { Some(watch_folder) => { // fetch if let Err(err) = connection.fetch(&ctx, &watch_folder).await { @@ -153,7 +136,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool .unwrap_or_else(|err| { connection.trigger_reconnect(); error!(ctx, "{}", err); - false + InterruptInfo::new(false, None) }) } else { connection.fake_idle(&ctx, Some(watch_folder)).await @@ -170,7 +153,7 @@ async fn simple_imap_loop( ctx: Context, started: Sender<()>, inbox_handlers: ImapConnectionHandlers, - folder: impl AsRef, + folder: Config, ) { use futures::future::FutureExt; @@ -193,7 +176,7 @@ async fn simple_imap_loop( started.send(()).await; loop { - fetch_idle(&ctx, &mut connection, folder.as_ref()).await; + fetch_idle(&ctx, &mut connection, folder).await; } }; @@ -223,18 +206,18 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect started.send(()).await; let ctx = ctx1; - let mut probe_network = false; + let mut interrupt_info = Default::default(); loop { - match job::load_next(&ctx, Thread::Smtp, probe_network).await { + match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await { Some(job) => { info!(ctx, "executing smtp job"); job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await; - probe_network = false; + interrupt_info = Default::default(); } None => { // Fake Idle info!(ctx, "smtp fake idle - started"); - probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default(); + interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default(); info!(ctx, "smtp fake idle - interrupted") } } @@ -286,7 +269,7 @@ impl Scheduler { ctx1, mvbox_start_send, mvbox_handlers, - "configured_mvbox_folder", + Config::ConfiguredMvboxFolder, ) .await })); @@ -300,7 +283,7 @@ impl Scheduler { ctx1, sentbox_start_send, sentbox_handlers, - "configured_sentbox_folder", + Config::ConfiguredSentboxFolder, ) .await })); @@ -333,34 +316,34 @@ impl Scheduler { return; } - self.interrupt_inbox(true) - .join(self.interrupt_mvbox(true)) - .join(self.interrupt_sentbox(true)) - .join(self.interrupt_smtp(true)) + self.interrupt_inbox(InterruptInfo::new(true, None)) + .join(self.interrupt_mvbox(InterruptInfo::new(true, None))) + .join(self.interrupt_sentbox(InterruptInfo::new(true, None))) + .join(self.interrupt_smtp(InterruptInfo::new(true, None))) .await; } - async fn interrupt_inbox(&self, probe_network: bool) { + async fn interrupt_inbox(&self, info: InterruptInfo) { if let Scheduler::Running { ref inbox, .. } = self { - inbox.interrupt(probe_network).await; + inbox.interrupt(info).await; } } - async fn interrupt_mvbox(&self, probe_network: bool) { + async fn interrupt_mvbox(&self, info: InterruptInfo) { if let Scheduler::Running { ref mvbox, .. } = self { - mvbox.interrupt(probe_network).await; + mvbox.interrupt(info).await; } } - async fn interrupt_sentbox(&self, probe_network: bool) { + async fn interrupt_sentbox(&self, info: InterruptInfo) { if let Scheduler::Running { ref sentbox, .. } = self { - sentbox.interrupt(probe_network).await; + sentbox.interrupt(info).await; } } - async fn interrupt_smtp(&self, probe_network: bool) { + async fn interrupt_smtp(&self, info: InterruptInfo) { if let Scheduler::Running { ref smtp, .. } = self { - smtp.interrupt(probe_network).await; + smtp.interrupt(info).await; } } @@ -429,7 +412,7 @@ struct ConnectionState { /// Channel to interrupt the whole connection. stop_sender: Sender<()>, /// Channel to interrupt idle. - idle_interrupt_sender: Sender, + idle_interrupt_sender: Sender, } impl ConnectionState { @@ -441,9 +424,9 @@ impl ConnectionState { self.shutdown_receiver.recv().await.ok(); } - async fn interrupt(&self, probe_network: bool) { + async fn interrupt(&self, info: InterruptInfo) { // Use try_send to avoid blocking on interrupts. - self.idle_interrupt_sender.try_send(probe_network).ok(); + self.idle_interrupt_sender.try_send(info).ok(); } } @@ -477,8 +460,8 @@ impl SmtpConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self, probe_network: bool) { - self.state.interrupt(probe_network).await; + async fn interrupt(&self, info: InterruptInfo) { + self.state.interrupt(info).await; } /// Shutdown this connection completely. @@ -492,7 +475,7 @@ struct SmtpConnectionHandlers { connection: Smtp, stop_receiver: Receiver<()>, shutdown_sender: Sender<()>, - idle_interrupt_receiver: Receiver, + idle_interrupt_receiver: Receiver, } #[derive(Debug)] @@ -525,8 +508,8 @@ impl ImapConnectionState { } /// Interrupt any form of idle. - async fn interrupt(&self, probe_network: bool) { - self.state.interrupt(probe_network).await; + async fn interrupt(&self, info: InterruptInfo) { + self.state.interrupt(info).await; } /// Shutdown this connection completely. @@ -542,20 +525,17 @@ struct ImapConnectionHandlers { shutdown_sender: Sender<()>, } -async fn get_watch_folder(context: &Context, config_name: impl AsRef) -> Option { - match context - .sql - .get_raw_config(context, config_name.as_ref()) - .await - { - Some(name) => Some(name), - None => { - if config_name.as_ref() == "configured_inbox_folder" { - // initialized with old version, so has not set configured_inbox_folder - Some("INBOX".to_string()) - } else { - None - } +#[derive(Default, Debug)] +pub struct InterruptInfo { + pub probe_network: bool, + pub msg_id: Option, +} + +impl InterruptInfo { + pub fn new(probe_network: bool, msg_id: Option) -> Self { + Self { + probe_network, + msg_id, } } }