Compare commits

...

25 Commits

Author SHA1 Message Date
holger krekel
186512b356 mark 30-times runs tests as ignored by default 2020-06-05 16:43:21 +02:00
Hocuri
be573767f4 Comments 2020-06-05 12:22:39 +02:00
Hocuri
d40754273d Small fixes 2020-06-05 12:22:39 +02:00
Hocuri
6e05039a10 Try to fix it. I dont like the code and it does not work. 2020-06-05 12:22:39 +02:00
Hocuri
76f4b9baa4 Let the python tast recognize when the mvbox is Inbox/DeltaChat instead of DeltaChat 2020-06-05 12:22:38 +02:00
Hocuri
1c936260a0 Rm some debug logs 2020-06-05 12:22:38 +02:00
Hocuri
676ff127c1 typo 2020-06-05 12:22:38 +02:00
Hocuri
a704947546 Next try to reliably mark read 2020-06-05 12:22:37 +02:00
Hocuri
a862484927 Print more info 2020-06-05 12:22:37 +02:00
Hocuri
d66b6acde4 Lint 2020-06-05 12:22:37 +02:00
Hocuri
5677bf3fe5 Put emails into the tmp folder 2020-06-05 12:22:36 +02:00
Hocuri
85a4637b9f update_server_uid() when we see that a messhge was moved 2020-06-05 12:22:36 +02:00
Hocuri
232f1d4f86 Always call print_imap_structure when test fails 2020-06-05 12:22:36 +02:00
Hocuri
3cc5aef016 Add test_mark_bcc_read_on_server 2020-06-05 12:22:36 +02:00
Hocuri
79dac9513f Add print_imap_structure 2020-06-05 12:22:35 +02:00
Hocuri
a8d53301e3 Adapt to new refactorings 2020-06-05 12:22:35 +02:00
Hocuri
a4def9c39c Don't catch 2020-06-05 12:22:35 +02:00
Hocuri
bd59f4914c Add hint where the error comes from 2020-06-05 12:22:35 +02:00
Hocuri
d5e6b6231d Lint 2020-06-05 12:22:34 +02:00
Hocuri
89341c1ceb Add test_mark_read_on_server that reproduces https://github.com/deltachat/deltachat-core-rust/issues/1495 2020-06-05 12:22:34 +02:00
Hocuri
c0003c6726 Use Imaplib instead of ImapClient 2020-06-05 12:22:34 +02:00
Hocuri
1290c191b3 Move direct_imap to deltachat folder 2020-06-05 12:22:33 +02:00
Hocuri
69a28df74d rm unused import 2020-06-05 12:22:33 +02:00
Hocuri
303417f095 Add make_direct_imap() fn. I had to disable PersistentDict because I had problems with importing. 2020-06-05 12:22:33 +02:00
Hocuri
8483b3e1d3 Add holger's ImapConn 2020-06-05 12:22:33 +02:00
12 changed files with 405 additions and 173 deletions

View File

@@ -28,7 +28,7 @@ class Account(object):
"""
MissingCredentials = MissingCredentials
def __init__(self, db_path, os_name=None, logging=True):
def __init__(self, db_path, os_name=None, logging=True, logid=None):
""" initialize account object.
:param db_path: a path to the account database. The database
@@ -38,6 +38,7 @@ class Account(object):
# initialize per-account plugin system
self._pm = hookspec.PerAccount._make_plugin_manager()
self._logging = logging
self.logid = logid
self.add_account_plugin(self)

View File

@@ -0,0 +1,141 @@
import imaplib
import pathlib
from . import Account
INBOX = "Inbox"
SENT = "Sent"
MVBOX = "DeltaChat"
MVBOX_FALLBBACK = "INBOX/DeltaChat"
DC_CONSTANT_MSG_MOVESTATE_PENDING = 1
DC_CONSTANT_MSG_MOVESTATE_STAY = 2
DC_CONSTANT_MSG_MOVESTATE_MOVING = 3
def db_folder_attr(name):
def fget(s):
return s.db_folder.get(name, 1)
def fset(s, val):
s.db_folder[name] = val
return property(fget, fset, None, None)
class ImapConn():
def __init__(self, foldername, conn_info):
self.foldername = foldername
host, user, pw = conn_info
self.connection = imaplib.IMAP4_SSL(host)
self.connection.login(user, pw)
messages = self.reselect_folder()
try:
self.original_msg_count = int(messages[0])
except IndexError:
self.original_msg_count = 0
def mark_all_read(self):
self.reselect_folder()
# result, data = self.connection.uid('search', None, "(UNSEEN)")
result, data = self.connection.search(None, 'UnSeen')
try:
mails_uid = data[0].split()
print("New mails")
# self.connection.store(data[0].replace(' ',','),'+FLAGS','\Seen')
for e_id in mails_uid:
self.connection.store(e_id, '+FLAGS', '\\Seen')
print("marked:", e_id)
return True
except IndexError:
print("No unread")
return False
def get_unread_cnt(self):
self.reselect_folder()
# result, data = self.connection.uid('search', None, "(UNSEEN)")
result, data = self.connection.search(None, 'UnSeen')
try:
mails_uid = data[0].split()
return len(mails_uid)
except IndexError:
return 0
def get_new_email_cnt(self):
messages = self.reselect_folder()
try:
return int(messages[0]) - self.original_msg_count
except IndexError:
return 0
def reselect_folder(self):
status, messages = self.connection.select(self.foldername)
if status != "OK":
print("Incorrect mail box " + status + str(messages))
raise ConnectionError
# print("(Re-)Selected mailbox: " + status + " " + str(messages))
return messages
def __del__(self):
try:
self.connection.close()
except Exception:
pass
try:
self.connection.logout()
except Exception:
print("Could not logout direct_imap conn")
def make_direct_imap(account, folder):
conn_info = (account.get_config("configured_mail_server"),
account.get_config("addr"), account.get_config("mail_pw"))
# try:
# return ImapConn(folder, conn_info=conn_info)
# except ConnectionError as e:
# if folder == MVBOX:
# account.log("Selecting " + MVBOX_FALLBBACK + " not " + MVBOX + " because connecting to the latter failed")
# return ImapConn(MVBOX_FALLBBACK, conn_info=conn_info)
# else:
# raise e
if folder == MVBOX:
new_folder = account.get_config("configured_mvbox_folder")
else:
new_folder = folder
if new_folder != folder:
account.log("Making connection with " + new_folder + " not " + folder)
return ImapConn(new_folder, conn_info=conn_info)
def print_imap_structure(database, dir="."):
print_imap_structure_ac(Account(database), dir)
def print_imap_structure_ac(ac, dir="."):
acinfo = ac.logid + "-" + ac.get_config("addr")
print("================= ACCOUNT", acinfo, "=================")
print("----------------- CONFIG: -----------------")
print(ac.get_info())
for imapfolder in [INBOX, MVBOX, SENT, MVBOX_FALLBBACK]:
try:
imap = make_direct_imap(ac, imapfolder)
c = imap.connection
typ, data = c.search(None, 'ALL')
c._get_tagged_response
print("-----------------", imapfolder, "-----------------")
for num in data[0].split():
typ, data = c.fetch(num, '(RFC822)')
body = data[0][1]
typ, data = c.fetch(num, '(UID FLAGS)')
info = data[0]
path = pathlib.Path(dir).joinpath("IMAP-MESSAGES", acinfo, imapfolder)
path.mkdir(parents=True, exist_ok=True)
file = path.joinpath(str(info).replace("b'", "").replace("'", "").replace("\\", ""))
file.write_bytes(body)
print("Message", info, "saved as", file)
except Exception:
pass

View File

@@ -12,7 +12,7 @@ import tempfile
import pytest
import requests
from . import Account, const
from . import Account, const, direct_imap
from .capi import lib
from .events import FFIEventLogger, FFIEventTracker
from _pytest._code import Source
@@ -33,9 +33,6 @@ def pytest_addoption(parser):
def pytest_configure(config):
config.addinivalue_line(
"markers", "ignored: Mark test as bing slow, skipped unless --ignored is used."
)
cfg = config.getoption('--liveconfig')
if not cfg:
cfg = os.getenv('DCC_NEW_TMP_EMAIL')
@@ -228,7 +225,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
acc.disable_logging()
def make_account(self, path, logid, quiet=False):
ac = Account(path, logging=self._logging)
ac = Account(path, logging=self._logging, logid=logid)
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
ac.addr = ac.get_self_contact().addr
if not quiet:
@@ -377,7 +374,10 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
am = AccountMaker()
request.addfinalizer(am.finalize)
return am
yield am
if request.node.rep_call.failed:
for ac in am._accounts:
direct_imap.print_imap_structure_ac(ac, tmpdir)
class BotProcess:
@@ -446,3 +446,15 @@ def lp():
def step(self, msg):
print("-" * 5, "step " + msg, "-" * 5)
return Printer()
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
# set a report attribute for each phase of a call, which can
# be "setup", "call", "teardown"
setattr(item, "rep_" + rep.when, rep)

View File

@@ -7,6 +7,8 @@ from deltachat import const, Account
from deltachat.message import Message
from deltachat.hookspec import account_hookimpl
from datetime import datetime, timedelta
from deltachat import direct_imap
from deltachat.direct_imap import make_direct_imap
@pytest.mark.parametrize("msgtext,res", [
@@ -635,6 +637,85 @@ class TestOnlineAccount:
ev_msg = ac1_clone._evtracker.wait_next_messages_changed()
assert ev_msg.text == msg_out.text
@pytest.mark.ignored
@pytest.mark.parametrize('i', range(30))
def test_mark_read_on_server(self, acfactory, lp, i):
ac1 = acfactory.get_online_configuring_account()
ac2 = acfactory.get_online_configuring_account(mvbox=True, move=True)
ac1.wait_configure_finish()
ac1.start_io()
ac2.wait_configure_finish()
ac2.start_io()
imap2 = make_direct_imap(ac2, direct_imap.MVBOX)
# imap2.mark_all_read()
assert imap2.get_unread_cnt() == 0
chat = self.get_chat(ac1, ac2)
chat_on_ac2 = self.get_chat(ac2, ac1)
chat.send_text("Text message")
incoming_on_ac2 = ac2._evtracker.wait_next_incoming_message()
lp.sec("Incoming: "+incoming_on_ac2.text)
assert list(ac2.get_fresh_messages())
for i in range(0, 20):
if imap2.get_unread_cnt() == 1:
break
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
assert imap2.get_unread_cnt() == 1
chat_on_ac2.mark_noticed()
incoming_on_ac2.mark_seen()
ac2._evtracker.wait_next_messages_changed()
assert not list(ac2.get_fresh_messages())
# The new messages should be seen now.
for i in range(0, 20):
if imap2.get_unread_cnt() == 0:
break
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
assert imap2.get_unread_cnt() == 0
@pytest.mark.ignored
@pytest.mark.parametrize('i', range(30))
def test_mark_bcc_read_on_server(self, acfactory, lp, i):
ac1 = acfactory.get_online_configuring_account(mvbox=True, move=True)
ac2 = acfactory.get_online_configuring_account()
ac1.wait_configure_finish()
ac1.start_io()
ac2.wait_configure_finish()
ac2.start_io()
imap1 = make_direct_imap(ac1, direct_imap.MVBOX)
imap1.mark_all_read()
assert imap1.get_unread_cnt() == 0
chat = self.get_chat(ac1, ac2)
ac1.set_config("bcc_self", "1")
chat.send_text("Text message")
ac1._evtracker.get_matching("DC_EVENT_SMTP_MESSAGE_SENT")
for i in range(0, 20):
if imap1.get_new_email_cnt() == 1:
break
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
assert imap1.get_new_email_cnt() == 1
for i in range(0, 20):
if imap1.get_unread_cnt() == 0:
break
time.sleep(1) # We might need to wait because Imaplib is slower than DC-Core
assert imap1.get_unread_cnt() == 0
def test_send_file_twice_unicode_filename_mangling(self, tmpdir, acfactory, lp):
ac1, ac2 = acfactory.get_two_online_accounts()
chat = self.get_chat(ac1, ac2)

View File

@@ -71,6 +71,8 @@ norecursedirs = .tox
xfail_strict=true
timeout = 90
timeout_method = thread
markers =
ignored: ignore this test in default test runs, use --ignored to run.
[flake8]
max-line-length = 120

View File

@@ -11,7 +11,7 @@ use crate::dc_tools::*;
use crate::events::Event;
use crate::message::MsgId;
use crate::mimefactory::RECOMMENDED_FILE_SIZE;
use crate::stock::StockMessage;
use crate::{scheduler::InterruptInfo, stock::StockMessage};
/// The available configuration keys.
#[derive(
@@ -104,6 +104,9 @@ pub enum Config {
ConfiguredServerFlags,
ConfiguredSendSecurity,
ConfiguredE2EEEnabled,
ConfiguredInboxFolder,
ConfiguredMvboxFolder,
ConfiguredSentboxFolder,
Configured,
#[strum(serialize = "sys.version")]
@@ -137,6 +140,7 @@ impl Context {
// Default values
match key {
Config::Selfstatus => Some(self.stock_str(StockMessage::StatusLine).await.into_owned()),
Config::ConfiguredInboxFolder => Some("INBOX".to_owned()),
_ => key.get_str("default").map(|s| s.to_string()),
}
}
@@ -199,17 +203,18 @@ impl Context {
}
Config::InboxWatch => {
let ret = self.sql.set_raw_config(self, key, value).await;
self.interrupt_inbox(false).await;
self.interrupt_inbox(InterruptInfo::new(false, None)).await;
ret
}
Config::SentboxWatch => {
let ret = self.sql.set_raw_config(self, key, value).await;
self.interrupt_sentbox(false).await;
self.interrupt_sentbox(InterruptInfo::new(false, None))
.await;
ret
}
Config::MvboxWatch => {
let ret = self.sql.set_raw_config(self, key, value).await;
self.interrupt_mvbox(false).await;
self.interrupt_mvbox(InterruptInfo::new(false, None)).await;
ret
}
Config::Selfstatus => {

View File

@@ -272,9 +272,7 @@ async fn configure(ctx: &Context, param: &mut LoginParam) -> Result<()> {
let create_mvbox = ctx.get_config_bool(Config::MvboxWatch).await
|| ctx.get_config_bool(Config::MvboxMove).await;
imap.configure_folders(ctx, create_mvbox)
.await
.context("configuring folders failed")?;
imap.configure_folders(ctx, create_mvbox).await?;
imap.select_with_uidvalidity(ctx, "INBOX")
.await

View File

@@ -300,13 +300,11 @@ impl Context {
.unwrap_or_default();
let configured_sentbox_folder = self
.sql
.get_raw_config(self, "configured_sentbox_folder")
.get_config(Config::ConfiguredSentboxFolder)
.await
.unwrap_or_else(|| "<unset>".to_string());
let configured_mvbox_folder = self
.sql
.get_raw_config(self, "configured_mvbox_folder")
.get_config(Config::ConfiguredMvboxFolder)
.await
.unwrap_or_else(|| "<unset>".to_string());
@@ -442,33 +440,19 @@ impl Context {
.unwrap_or_default()
}
pub fn is_inbox(&self, folder_name: impl AsRef<str>) -> bool {
folder_name.as_ref() == "INBOX"
pub async fn is_inbox(&self, folder_name: impl AsRef<str>) -> bool {
self.get_config(Config::ConfiguredInboxFolder).await
== Some(folder_name.as_ref().to_string())
}
pub async fn is_sentbox(&self, folder_name: impl AsRef<str>) -> bool {
let sentbox_name = self
.sql
.get_raw_config(self, "configured_sentbox_folder")
.await;
if let Some(name) = sentbox_name {
name == folder_name.as_ref()
} else {
false
}
self.get_config(Config::ConfiguredSentboxFolder).await
== Some(folder_name.as_ref().to_string())
}
pub async fn is_mvbox(&self, folder_name: impl AsRef<str>) -> bool {
let mvbox_name = self
.sql
.get_raw_config(self, "configured_mvbox_folder")
.await;
if let Some(name) = mvbox_name {
name == folder_name.as_ref()
} else {
false
}
self.get_config(Config::ConfiguredMvboxFolder).await
== Some(folder_name.as_ref().to_string())
}
pub async fn do_heuristics_moves(&self, folder: &str, msg_id: MsgId) {

View File

@@ -4,7 +4,7 @@ use async_imap::extensions::idle::IdleResponse;
use async_std::prelude::*;
use std::time::{Duration, SystemTime};
use crate::context::Context;
use crate::{context::Context, scheduler::InterruptInfo};
use super::select_folder;
use super::session::Session;
@@ -34,7 +34,11 @@ impl Imap {
self.config.can_idle
}
pub async fn idle(&mut self, context: &Context, watch_folder: Option<String>) -> Result<bool> {
pub async fn idle(
&mut self,
context: &Context,
watch_folder: Option<String>,
) -> Result<InterruptInfo> {
use futures::future::FutureExt;
if !self.can_idle() {
@@ -46,7 +50,7 @@ impl Imap {
let session = self.session.take();
let timeout = Duration::from_secs(23 * 60);
let mut probe_network = false;
let mut info = Default::default();
if let Some(session) = session {
let mut handle = session.idle();
@@ -58,7 +62,7 @@ impl Imap {
enum Event {
IdleResponse(IdleResponse),
Interrupt(bool),
Interrupt(InterruptInfo),
}
if self.skip_next_idle_wait {
@@ -90,8 +94,8 @@ impl Imap {
Ok(Event::IdleResponse(IdleResponse::ManualInterrupt)) => {
info!(context, "Idle wait was interrupted");
}
Ok(Event::Interrupt(probe)) => {
probe_network = probe;
Ok(Event::Interrupt(i)) => {
info = i;
info!(context, "Idle wait was interrupted");
}
Err(err) => {
@@ -125,14 +129,14 @@ impl Imap {
}
}
Ok(probe_network)
Ok(info)
}
pub(crate) async fn fake_idle(
&mut self,
context: &Context,
watch_folder: Option<String>,
) -> bool {
) -> InterruptInfo {
// Idle using polling. This is also needed if we're not yet configured -
// in this case, we're waiting for a configure job (and an interrupt).
@@ -144,7 +148,7 @@ impl Imap {
return self.idle_interrupt.recv().await.unwrap_or_default();
}
let mut probe_network = false;
let mut info: InterruptInfo = Default::default();
if self.skip_next_idle_wait {
// interrupt_idle has happened before we
// provided self.interrupt
@@ -157,10 +161,10 @@ impl Imap {
enum Event {
Tick,
Interrupt(bool),
Interrupt(InterruptInfo),
}
// loop until we are interrupted or if we fetched something
probe_network =
info =
loop {
use futures::future::FutureExt;
match interval
@@ -181,7 +185,7 @@ impl Imap {
}
if self.config.can_idle {
// we only fake-idled because network was gone during IDLE, probably
break false;
break InterruptInfo::new(false, None);
}
info!(context, "fake_idle is connected");
// we are connected, let's see if fetching messages results
@@ -194,7 +198,7 @@ impl Imap {
Ok(res) => {
info!(context, "fetch_new_messages returned {:?}", res);
if res {
break false;
break InterruptInfo::new(false, None);
}
}
Err(err) => {
@@ -204,9 +208,9 @@ impl Imap {
}
}
}
Event::Interrupt(probe_network) => {
Event::Interrupt(info) => {
// Interrupt
break probe_network;
break info;
}
}
};
@@ -222,6 +226,6 @@ impl Imap {
/ 1000.,
);
probe_network
info
}
}

View File

@@ -27,7 +27,7 @@ use crate::message::{self, update_server_uid};
use crate::mimeparser;
use crate::oauth2::dc_get_oauth2_access_token;
use crate::param::Params;
use crate::stock::StockMessage;
use crate::{scheduler::InterruptInfo, stock::StockMessage};
mod client;
mod idle;
@@ -109,7 +109,7 @@ const SELECT_ALL: &str = "1:*";
#[derive(Debug)]
pub struct Imap {
idle_interrupt: Receiver<bool>,
idle_interrupt: Receiver<InterruptInfo>,
config: ImapConfig,
session: Option<Session>,
connected: bool,
@@ -181,7 +181,7 @@ impl Default for ImapConfig {
}
impl Imap {
pub fn new(idle_interrupt: Receiver<bool>) -> Self {
pub fn new(idle_interrupt: Receiver<InterruptInfo>) -> Self {
Imap {
idle_interrupt,
config: Default::default(),
@@ -974,7 +974,7 @@ impl Imap {
uid: u32,
) -> Option<ImapActionResult> {
if uid == 0 {
return Some(ImapActionResult::Failed);
return Some(ImapActionResult::RetryLater);
}
if !self.is_connected() {
// currently jobs are only performed on the INBOX thread
@@ -1223,19 +1223,16 @@ impl Imap {
}
}
context
.sql
.set_raw_config(context, "configured_inbox_folder", Some("INBOX"))
.set_config(Config::ConfiguredInboxFolder, Some("INBOX"))
.await?;
if let Some(ref mvbox_folder) = mvbox_folder {
context
.sql
.set_raw_config(context, "configured_mvbox_folder", Some(mvbox_folder))
.set_config(Config::ConfiguredMvboxFolder, Some(mvbox_folder))
.await?;
}
if let Some(ref sentbox_folder) = sentbox_folder {
context
.sql
.set_raw_config(context, "configured_sentbox_folder", Some(sentbox_folder))
.set_config(Config::ConfiguredSentboxFolder, Some(sentbox_folder))
.await?;
}
context
@@ -1393,7 +1390,11 @@ async fn precheck_imf(
}
if old_server_folder != server_folder || old_server_uid != server_uid {
update_server_uid(context, &rfc724_mid, server_folder, server_uid).await;
update_server_uid(context, rfc724_mid, server_folder, server_uid).await;
context
.interrupt_inbox(InterruptInfo::new(false, Some(msg_id)))
.await;
info!(context, "Updating server_uid and interrupting")
}
Ok(true)
} else {

View File

@@ -31,7 +31,7 @@ use crate::message::{self, Message, MessageState};
use crate::mimefactory::MimeFactory;
use crate::param::*;
use crate::smtp::Smtp;
use crate::sql;
use crate::{scheduler::InterruptInfo, sql};
// results in ~3 weeks for the last backoff timespan
const JOB_RETRIES: u32 = 17;
@@ -504,10 +504,7 @@ impl Job {
warn!(context, "could not configure folders: {:?}", err);
return Status::RetryLater;
}
let dest_folder = context
.sql
.get_raw_config(context, "configured_mvbox_folder")
.await;
let dest_folder = context.get_config(Config::ConfiguredMvboxFolder).await;
if let Some(dest_folder) = dest_folder {
let server_folder = msg.server_folder.as_ref().unwrap();
@@ -518,7 +515,7 @@ impl Job {
{
ImapActionResult::RetryLater => Status::RetryLater,
ImapActionResult::Success => {
// XXX Rust-Imap provides no target uid on mv, so just set it to 0
// Rust-Imap provides no target uid on mv, so just set it to 0, update again when precheck_imf() is called for the moved message
message::update_server_uid(context, &msg.rfc724_mid, &dest_folder, 0).await;
Status::Finished(Ok(()))
}
@@ -612,11 +609,7 @@ impl Job {
async fn empty_server(&mut self, context: &Context, imap: &mut Imap) -> Status {
if self.foreign_id & DC_EMPTY_MVBOX > 0 {
if let Some(mvbox_folder) = context
.sql
.get_raw_config(context, "configured_mvbox_folder")
.await
{
if let Some(mvbox_folder) = &context.get_config(Config::ConfiguredMvboxFolder).await {
imap.empty_folder(context, &mvbox_folder).await;
}
}
@@ -1029,14 +1022,18 @@ pub async fn add(context: &Context, job: Job) {
| Action::MarkseenMsgOnImap
| Action::MoveMsg => {
info!(context, "interrupt: imap");
context.interrupt_inbox(false).await;
context
.interrupt_inbox(InterruptInfo::new(false, None))
.await;
}
Action::MaybeSendLocations
| Action::MaybeSendLocationsEnded
| Action::SendMdn
| Action::SendMsgToSmtp => {
info!(context, "interrupt: smtp");
context.interrupt_smtp(false).await;
context
.interrupt_smtp(InterruptInfo::new(false, None))
.await;
}
}
}
@@ -1051,38 +1048,49 @@ pub async fn add(context: &Context, job: Job) {
pub(crate) async fn load_next(
context: &Context,
thread: Thread,
probe_network: bool,
info: &InterruptInfo,
) -> Option<Job> {
info!(context, "loading job for {}-thread", thread);
let query = if !probe_network {
let query;
let params;
let t = time();
let m;
let thread_i = thread as i64;
if let Some(msg_id) = info.msg_id {
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND foreign_id=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#;
m = msg_id;
params = paramsv![thread_i, m];
} else if !info.probe_network {
// processing for first-try and after backoff-timeouts:
// process jobs in the order they were added.
r#"
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND desired_timestamp<=?
ORDER BY action DESC, added_timestamp
LIMIT 1;
"#
"#;
params = paramsv![thread_i, t];
} else {
// processing after call to dc_maybe_network():
// process _all_ pending jobs that failed before
// in the order of their backoff-times.
r#"
query = r#"
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
FROM jobs
WHERE thread=? AND tries>0
ORDER BY desired_timestamp, action DESC
LIMIT 1;
"#
};
let thread_i = thread as i64;
let t = time();
let params = if !probe_network {
paramsv![thread_i, t]
} else {
paramsv![thread_i]
"#;
params = paramsv![thread_i];
};
let job = loop {
@@ -1189,11 +1197,21 @@ mod tests {
// all jobs.
let t = dummy_context().await;
insert_job(&t.ctx, -1).await; // This can not be loaded into Job struct.
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
let jobs = load_next(
&t.ctx,
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
assert!(jobs.is_none());
insert_job(&t.ctx, 1).await;
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
let jobs = load_next(
&t.ctx,
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
assert!(jobs.is_some());
}
@@ -1203,7 +1221,12 @@ mod tests {
insert_job(&t.ctx, 1).await;
let jobs = load_next(&t.ctx, Thread::from(Action::MoveMsg), false).await;
let jobs = load_next(
&t.ctx,
Thread::from(Action::MoveMsg),
&InterruptInfo::new(false, None),
)
.await;
assert!(jobs.is_some());
}
}

View File

@@ -5,7 +5,7 @@ use async_std::task;
use crate::context::Context;
use crate::imap::Imap;
use crate::job::{self, Thread};
use crate::smtp::Smtp;
use crate::{config::Config, message::MsgId, smtp::Smtp};
pub(crate) struct StopToken;
@@ -32,36 +32,20 @@ impl Context {
self.scheduler.read().await.maybe_network().await;
}
pub(crate) async fn interrupt_inbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_inbox(probe_network)
.await;
pub(crate) async fn interrupt_inbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_inbox(info).await;
}
pub(crate) async fn interrupt_sentbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_sentbox(probe_network)
.await;
pub(crate) async fn interrupt_sentbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_sentbox(info).await;
}
pub(crate) async fn interrupt_mvbox(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_mvbox(probe_network)
.await;
pub(crate) async fn interrupt_mvbox(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_mvbox(info).await;
}
pub(crate) async fn interrupt_smtp(&self, probe_network: bool) {
self.scheduler
.read()
.await
.interrupt_smtp(probe_network)
.await;
pub(crate) async fn interrupt_smtp(&self, info: InterruptInfo) {
self.scheduler.read().await.interrupt_smtp(info).await;
}
}
@@ -86,14 +70,14 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
started.send(()).await;
// track number of continously executed jobs
let mut jobs_loaded = 0;
let mut probe_network = false;
let mut jobs_loaded: i32 = 0;
let mut info: InterruptInfo = Default::default();
loop {
match job::load_next(&ctx, Thread::Imap, probe_network).await {
match job::load_next(&ctx, Thread::Imap, &info).await {
Some(job) if jobs_loaded <= 20 => {
jobs_loaded += 1;
job::perform_job(&ctx, job::Connection::Inbox(&mut connection), job).await;
probe_network = false;
info = Default::default();
}
Some(job) => {
// Let the fetch run, but return back to the job afterwards.
@@ -103,8 +87,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}
None => {
jobs_loaded = 0;
probe_network =
fetch_idle(&ctx, &mut connection, "configured_inbox_folder").await;
info = fetch_idle(&ctx, &mut connection, Config::ConfiguredInboxFolder).await;
}
}
}
@@ -121,7 +104,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
}
async fn fetch(ctx: &Context, connection: &mut Imap) {
match get_watch_folder(&ctx, "configured_inbox_folder").await {
match ctx.get_config(Config::ConfiguredInboxFolder).await {
Some(watch_folder) => {
// fetch
if let Err(err) = connection.fetch(&ctx, &watch_folder).await {
@@ -136,8 +119,8 @@ async fn fetch(ctx: &Context, connection: &mut Imap) {
}
}
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool {
match get_watch_folder(&ctx, folder).await {
async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: Config) -> InterruptInfo {
match ctx.get_config(folder).await {
Some(watch_folder) => {
// fetch
if let Err(err) = connection.fetch(&ctx, &watch_folder).await {
@@ -153,7 +136,7 @@ async fn fetch_idle(ctx: &Context, connection: &mut Imap, folder: &str) -> bool
.unwrap_or_else(|err| {
connection.trigger_reconnect();
error!(ctx, "{}", err);
false
InterruptInfo::new(false, None)
})
} else {
connection.fake_idle(&ctx, Some(watch_folder)).await
@@ -170,7 +153,7 @@ async fn simple_imap_loop(
ctx: Context,
started: Sender<()>,
inbox_handlers: ImapConnectionHandlers,
folder: impl AsRef<str>,
folder: Config,
) {
use futures::future::FutureExt;
@@ -193,7 +176,7 @@ async fn simple_imap_loop(
started.send(()).await;
loop {
fetch_idle(&ctx, &mut connection, folder.as_ref()).await;
fetch_idle(&ctx, &mut connection, folder).await;
}
};
@@ -223,18 +206,18 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
started.send(()).await;
let ctx = ctx1;
let mut probe_network = false;
let mut interrupt_info = Default::default();
loop {
match job::load_next(&ctx, Thread::Smtp, probe_network).await {
match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
Some(job) => {
info!(ctx, "executing smtp job");
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
probe_network = false;
interrupt_info = Default::default();
}
None => {
// Fake Idle
info!(ctx, "smtp fake idle - started");
probe_network = idle_interrupt_receiver.recv().await.unwrap_or_default();
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
info!(ctx, "smtp fake idle - interrupted")
}
}
@@ -286,7 +269,7 @@ impl Scheduler {
ctx1,
mvbox_start_send,
mvbox_handlers,
"configured_mvbox_folder",
Config::ConfiguredMvboxFolder,
)
.await
}));
@@ -300,7 +283,7 @@ impl Scheduler {
ctx1,
sentbox_start_send,
sentbox_handlers,
"configured_sentbox_folder",
Config::ConfiguredSentboxFolder,
)
.await
}));
@@ -333,34 +316,34 @@ impl Scheduler {
return;
}
self.interrupt_inbox(true)
.join(self.interrupt_mvbox(true))
.join(self.interrupt_sentbox(true))
.join(self.interrupt_smtp(true))
self.interrupt_inbox(InterruptInfo::new(true, None))
.join(self.interrupt_mvbox(InterruptInfo::new(true, None)))
.join(self.interrupt_sentbox(InterruptInfo::new(true, None)))
.join(self.interrupt_smtp(InterruptInfo::new(true, None)))
.await;
}
async fn interrupt_inbox(&self, probe_network: bool) {
async fn interrupt_inbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref inbox, .. } = self {
inbox.interrupt(probe_network).await;
inbox.interrupt(info).await;
}
}
async fn interrupt_mvbox(&self, probe_network: bool) {
async fn interrupt_mvbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref mvbox, .. } = self {
mvbox.interrupt(probe_network).await;
mvbox.interrupt(info).await;
}
}
async fn interrupt_sentbox(&self, probe_network: bool) {
async fn interrupt_sentbox(&self, info: InterruptInfo) {
if let Scheduler::Running { ref sentbox, .. } = self {
sentbox.interrupt(probe_network).await;
sentbox.interrupt(info).await;
}
}
async fn interrupt_smtp(&self, probe_network: bool) {
async fn interrupt_smtp(&self, info: InterruptInfo) {
if let Scheduler::Running { ref smtp, .. } = self {
smtp.interrupt(probe_network).await;
smtp.interrupt(info).await;
}
}
@@ -429,7 +412,7 @@ struct ConnectionState {
/// Channel to interrupt the whole connection.
stop_sender: Sender<()>,
/// Channel to interrupt idle.
idle_interrupt_sender: Sender<bool>,
idle_interrupt_sender: Sender<InterruptInfo>,
}
impl ConnectionState {
@@ -441,9 +424,9 @@ impl ConnectionState {
self.shutdown_receiver.recv().await.ok();
}
async fn interrupt(&self, probe_network: bool) {
async fn interrupt(&self, info: InterruptInfo) {
// Use try_send to avoid blocking on interrupts.
self.idle_interrupt_sender.try_send(probe_network).ok();
self.idle_interrupt_sender.try_send(info).ok();
}
}
@@ -477,8 +460,8 @@ impl SmtpConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self, probe_network: bool) {
self.state.interrupt(probe_network).await;
async fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info).await;
}
/// Shutdown this connection completely.
@@ -492,7 +475,7 @@ struct SmtpConnectionHandlers {
connection: Smtp,
stop_receiver: Receiver<()>,
shutdown_sender: Sender<()>,
idle_interrupt_receiver: Receiver<bool>,
idle_interrupt_receiver: Receiver<InterruptInfo>,
}
#[derive(Debug)]
@@ -525,8 +508,8 @@ impl ImapConnectionState {
}
/// Interrupt any form of idle.
async fn interrupt(&self, probe_network: bool) {
self.state.interrupt(probe_network).await;
async fn interrupt(&self, info: InterruptInfo) {
self.state.interrupt(info).await;
}
/// Shutdown this connection completely.
@@ -542,20 +525,17 @@ struct ImapConnectionHandlers {
shutdown_sender: Sender<()>,
}
async fn get_watch_folder(context: &Context, config_name: impl AsRef<str>) -> Option<String> {
match context
.sql
.get_raw_config(context, config_name.as_ref())
.await
{
Some(name) => Some(name),
None => {
if config_name.as_ref() == "configured_inbox_folder" {
// initialized with old version, so has not set configured_inbox_folder
Some("INBOX".to_string())
} else {
None
}
#[derive(Default, Debug)]
pub struct InterruptInfo {
pub probe_network: bool,
pub msg_id: Option<MsgId>,
}
impl InterruptInfo {
pub fn new(probe_network: bool, msg_id: Option<MsgId>) -> Self {
Self {
probe_network,
msg_id,
}
}
}