Compare commits

...

11 Commits

Author SHA1 Message Date
holger krekel
502d675b70 trying trying 2020-05-24 17:36:25 +02:00
holger krekel
89fec474d4 Merge remote-tracking branch 'origin/feat/async-jobs-parallel-fetch' into try_stress_with_parallel 2020-05-24 17:02:08 +02:00
holger krekel
f682ae6695 emit debug message 2020-05-24 17:01:26 +02:00
holger krekel
a4971b5fcb Merge branch 'stress_test' into try_stress 2020-05-24 17:01:14 +02:00
dignifiedquire
72d4da0095 feat(imap): process incoming messages in bulk 2020-05-24 00:06:39 +02:00
holger krekel
71442db39f snap 2020-05-19 14:49:16 +02:00
holger krekel
452c9225dc snap 2020-05-18 18:26:50 +02:00
holger krekel
c1593c5c53 sipmlify, no randomness anymore 2020-05-18 18:26:50 +02:00
holger krekel
b0dbb28422 less randomness 2020-05-18 18:26:50 +02:00
holger krekel
9c02f6db6e some more mesh 2020-05-18 18:26:50 +02:00
holger krekel
508b985249 first working stress test 2020-05-18 18:26:50 +02:00
4 changed files with 309 additions and 90 deletions

View File

@@ -50,13 +50,12 @@ class Account(object):
lib.dc_context_unref,
)
if self._dc_context == ffi.NULL:
raise ValueError("Could not dc_context_new: {} {}".format(os_name, db_path))
hook = hookspec.Global._get_plugin_manager().hook
raise ValueError("FAILED dc_context_new: {} {}".format(os_name, db_path))
self._shutdown_event = Event()
self._event_thread = EventThread(self)
self._configkeys = self.get_config("sys.config_keys").split()
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_init(account=self)
def disable_logging(self):

View File

@@ -15,7 +15,6 @@ import requests
from . import Account, const
from .capi import lib
from .events import FFIEventLogger, FFIEventTracker
from _pytest.monkeypatch import MonkeyPatch
from _pytest._code import Source
import deltachat
@@ -101,17 +100,16 @@ def pytest_report_header(config, startdir):
summary = []
t = tempfile.mktemp()
m = MonkeyPatch()
try:
ac = Account(t)
info = ac.get_info()
ac.shutdown()
finally:
m.undo()
os.remove(t)
summary.extend(['Deltachat core={} sqlite={}'.format(
summary.extend(['Deltachat core={} sqlite={} journal_mode={}'.format(
info['deltachat_core_version'],
info['sqlite_version'],
info['journal_mode'],
)])
cfg = config.option.liveconfig
@@ -232,6 +230,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
def make_account(self, path, logid, quiet=False):
ac = Account(path, logging=self._logging)
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
ac.addr = ac.get_self_contact().addr
if not quiet:
ac.add_account_plugin(FFIEventLogger(ac, logid=logid))
self._accounts.append(ac)
@@ -321,6 +320,14 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
ac2.start_io()
return ac1, ac2
def get_many_online_accounts(self, num, move=True, quiet=True):
accounts = [self.get_online_configuring_account(move=move, quiet=quiet)
for i in range(num)]
for acc in accounts:
acc._configtracker.wait_finish()
acc.start_io()
return accounts
def clone_online_account(self, account, pre_generated_key=True):
self.live_count += 1
tmpdb = tmpdir.join("livedb%d" % self.live_count)

View File

@@ -0,0 +1,134 @@
import time
import threading
import pytest
import os
from queue import Queue, Empty
import deltachat
def test_db_busy_error(acfactory, tmpdir):
starttime = time.time()
log_lock = threading.RLock()
def log(string):
with log_lock:
print("%3.2f %s" % (time.time() - starttime, string))
# make a number of accounts
accounts = acfactory.get_many_online_accounts(5, quiet=True)
log("created %s accounts" % len(accounts))
# put a bigfile into each account
for acc in accounts:
acc.bigfile = os.path.join(acc.get_blobdir(), "bigfile")
with open(acc.bigfile, "wb") as f:
f.write(b"01234567890"*1000_000)
log("created %s bigfiles" % len(accounts))
contact_addrs = [acc.get_self_contact().addr for acc in accounts]
chat = accounts[0].create_group_chat("stress-group")
for addr in contact_addrs[1:]:
chat.add_contact(chat.account.create_contact(addr))
# setup auto-responder bots which report back failures/actions
report_queue = Queue()
def report_func(replier, report_type, *report_args):
report_queue.put((replier, report_type, report_args))
# each replier receives all events and sends report events to receive_queue
repliers = []
for acc in accounts:
replier = AutoReplier(acc, log=log, num_send=1000, num_bigfiles=0, report_func=report_func)
acc.add_account_plugin(replier)
repliers.append(replier)
# kick off message sending
# after which repliers will reply to each other
chat.send_text("hello")
alive_count = len(accounts)
while alive_count > 0:
try:
replier, report_type, report_args = report_queue.get(timeout=10)
except Empty:
log("timeout waiting for next event")
pytest.fail("timeout exceeded")
if report_type == ReportType.exit:
replier.log("EXIT".format(alive_count))
elif report_type == ReportType.ffi_error:
replier.log("ERROR: {}".format(addr, report_args[0]))
elif report_type == ReportType.message_echo:
continue
else:
raise ValueError("{} unknown report type {}, args={}".format(
addr, report_type, report_args
))
alive_count -= 1
replier.log("shutting down")
replier.account.shutdown()
replier.log("shut down complete, remaining={}".format(alive_count))
class ReportType:
exit = "exit"
ffi_error = "ffi-error"
message_echo = "message-echo"
class AutoReplier:
def __init__(self, account, log, num_send, num_bigfiles, report_func):
self.account = account
self._log = log
self.report_func = report_func
self.num_send = num_send
self.num_bigfiles = num_bigfiles
self.current_sent = 0
self.addr = self.account.get_self_contact().addr
self._thread = threading.Thread(
name="Stats{}".format(self.account),
target=self.thread_stats
)
self._thread.setDaemon(True)
self._thread.start()
def log(self, message):
self._log("{} {}".format(self.addr, message))
def thread_stats(self):
# XXX later use, for now we just quit
return
while 1:
time.sleep(1.0)
break
@deltachat.account_hookimpl
def ac_incoming_message(self, message):
if self.current_sent >= self.num_send:
return
message.accept_sender_contact()
message.mark_seen()
self.log("incoming message: {}".format(message))
self.current_sent += 1
# we are still alive, let's send a reply
if self.num_bigfiles and self.current_sent % self.num_bigfiles == 0:
message.chat.send_text("send big file as reply to: {}".format(message.text))
msg = message.chat.send_file(self.account.bigfile)
else:
msg = message.chat.send_text("got message id {}, small text reply".format(message.id))
assert msg.text
self.log("message-sent: {}".format(msg))
self.report_func(self, ReportType.message_echo)
if self.current_sent >= self.num_send:
self.report_func(self, ReportType.exit)
return
@deltachat.account_hookimpl
def ac_process_ffi_event(self, ffi_event):
self.log(ffi_event)
if ffi_event.name == "DC_EVENT_ERROR":
self.report_func(self, ReportType.ffi_error, ffi_event)

View File

@@ -578,10 +578,6 @@ impl Imap {
.await?;
let mut read_cnt: usize = 0;
let mut read_errors = 0;
// prefetch info from all unfetched mails
let mut new_last_seen_uid = last_seen_uid;
if self.session.is_none() {
return Err(Error::NoConnection);
@@ -591,12 +587,16 @@ impl Imap {
// fetch messages with larger UID than the last one seen
// `(UID FETCH lastseenuid+1:*)`, see RFC 4549
let set = format!("{}:*", last_seen_uid + 1);
let mut list = match session.uid_fetch(set, PREFETCH_FLAGS).await {
info!(context, "fetch_new_messages {:?}", set);
let mut list = match session.uid_fetch(&set, PREFETCH_FLAGS).await {
Ok(list) => list,
Err(err) => {
warn!(context, "ERROR: fetch_new_messages {:?} -> {:?}", &set, err);
return Err(Error::FetchFailed(err));
}
};
info!(context, "fetch_new_messages {:?} RETURNED", &set);
let mut msgs = Vec::new();
while let Some(fetch) = list.next().await {
@@ -604,86 +604,117 @@ impl Imap {
msgs.push(fetch);
}
drop(list);
info!(context, "fetch_new_messages got {:?} messsages", msgs.len());
msgs.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default());
let msgs: Vec<_> = msgs
.into_iter()
.filter(|msg| {
let cur_uid = msg.uid.unwrap_or_default();
if cur_uid <= last_seen_uid {
// If the mailbox is not empty, results always include
// at least one UID, even if last_seen_uid+1 is past
// the last UID in the mailbox. It happens because
// uid+1:* is interpreted the same way as *:uid+1.
// See https://tools.ietf.org/html/rfc3501#page-61 for
// standard reference. Therefore, sometimes we receive
// already seen messages and have to filter them out.
info!(
context,
"fetch_new_messages: ignoring uid {}, last seen was {}",
cur_uid,
last_seen_uid
);
false
} else {
true
}
})
.collect();
read_cnt += msgs.len();
let mut read_errors = 0;
let mut uids = Vec::with_capacity(msgs.len());
let mut new_last_seen_uid = None;
for fetch in msgs.into_iter() {
let folder: &str = folder.as_ref();
let cur_uid = fetch.uid.unwrap_or_default();
if cur_uid <= last_seen_uid {
// If the mailbox is not empty, results always include
// at least one UID, even if last_seen_uid+1 is past
// the last UID in the mailbox. It happens because
// uid+1:* is interpreted the same way as *:uid+1.
// See https://tools.ietf.org/html/rfc3501#page-61 for
// standard reference. Therefore, sometimes we receive
// already seen messages and have to filter them out.
info!(
context,
"fetch_new_messages: ignoring uid {}, last seen was {}", cur_uid, last_seen_uid
);
continue;
}
read_cnt += 1;
let headers = get_fetch_headers(&fetch)?;
let headers = match get_fetch_headers(&fetch) {
Ok(h) => h,
Err(err) => {
warn!(context, "get_fetch_headers error: {}", err);
read_errors += 1;
continue;
}
};
let message_id = prefetch_get_message_id(&headers).unwrap_or_default();
if let Ok(true) = precheck_imf(context, &message_id, folder.as_ref(), cur_uid)
.await
.map_err(|err| {
let skip = match precheck_imf(context, &message_id, folder, cur_uid).await {
Ok(skip) => skip,
Err(err) => {
warn!(context, "precheck_imf error: {}", err);
err
})
{
true
}
};
if skip {
// we know the message-id already or don't want the message otherwise.
info!(
context,
"Skipping message {} from \"{}\" by precheck.",
message_id,
folder.as_ref(),
"Skipping message {} from \"{}\" by precheck.", message_id, folder,
);
if read_errors == 0 {
new_last_seen_uid = Some(cur_uid);
}
} else {
// we do not know the message-id
// or the message-id is missing (in this case, we create one in the further process)
// or some other error happened
let show = prefetch_should_download(context, &headers, show_emails)
.await
.map_err(|err| {
let show = match prefetch_should_download(context, &headers, show_emails).await {
Ok(show) => show,
Err(err) => {
warn!(context, "prefetch_should_download error: {}", err);
err
})
.unwrap_or(true);
true
}
};
if !show {
if show {
uids.push(cur_uid);
} else {
info!(
context,
"Ignoring new message {} from \"{}\".",
message_id,
folder.as_ref(),
"Ignoring new message {} from \"{}\".", message_id, folder,
);
} else {
// check passed, go fetch the rest
if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await {
info!(
context,
"Read error for message {} from \"{}\", trying over later: {}.",
message_id,
folder.as_ref(),
err
);
read_errors += 1;
}
}
}
if read_errors == 0 {
new_last_seen_uid = cur_uid;
if read_errors == 0 {
new_last_seen_uid = Some(cur_uid);
}
}
}
if new_last_seen_uid > last_seen_uid {
info!(
context,
"fetch_many_msgs fetching {} messages in batch",
uids.len()
);
// check passed, go fetch the emails
let (new_last_seen_uid_processed, error_cnt) =
self.fetch_many_msgs(context, &folder, &uids).await;
let new_last_seen_uid_processed = new_last_seen_uid_processed.unwrap_or_default();
let new_last_seen_uid = new_last_seen_uid.unwrap_or_default();
let last_one = new_last_seen_uid.max(new_last_seen_uid_processed);
if last_one > last_seen_uid {
self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid)
.await;
}
read_errors += error_cnt;
if read_errors > 0 {
warn!(
context,
@@ -721,25 +752,36 @@ impl Imap {
.ok();
}
/// Fetches a single message by server UID.
/// Fetches a list of messages by server UID.
/// The passed in list of uids must be sorted.
///
/// If it succeeds, the message should be treated as received even
/// if no database entries are created. If the function returns an
/// error, the caller should try again later.
async fn fetch_single_msg<S: AsRef<str>>(
/// Returns the last uid fetch successfully and an error count.
async fn fetch_many_msgs<S: AsRef<str>>(
&mut self,
context: &Context,
folder: S,
server_uid: u32,
) -> Result<()> {
if !self.is_connected() {
return Err(Error::Other("Not connected".to_string()));
server_uids: &[u32],
) -> (Option<u32>, usize) {
if server_uids.is_empty() {
return (None, 0);
}
let set = format!("{}", server_uid);
let set = if server_uids.len() == 1 {
server_uids[0].to_string()
} else {
let first_uid = server_uids[0];
let last_uid = server_uids[server_uids.len() - 1];
assert!(first_uid < last_uid, "uids must be sorted");
format!("{}:{}", first_uid, last_uid)
};
if !self.is_connected() {
warn!(context, "Not connected");
return (None, server_uids.len());
}
let mut msgs = if let Some(ref mut session) = &mut self.session {
match session.uid_fetch(set, BODY_FLAGS).await {
match session.uid_fetch(&set, BODY_FLAGS).await {
Ok(msgs) => msgs,
Err(err) => {
// TODO maybe differentiate between IO and input/parsing problems
@@ -747,43 +789,80 @@ impl Imap {
self.should_reconnect = true;
warn!(
context,
"Error on fetching message #{} from folder \"{}\"; error={}.",
server_uid,
"Error on fetching messages #{} from folder \"{}\"; error={}.",
&set,
folder.as_ref(),
err
);
return Err(Error::FetchFailed(err));
return (None, server_uids.len());
}
}
} else {
// we could not get a valid imap session, this should be retried
self.trigger_reconnect();
return Err(Error::Other("Could not get IMAP session".to_string()));
warn!(context, "Could not get IMAP session");
return (None, server_uids.len());
};
if let Some(Ok(msg)) = msgs.next().await {
let mut read_errors = 0;
let mut last_uid = None;
let mut count = 0;
let mut jobs = Vec::with_capacity(server_uids.len());
while let Some(Ok(msg)) = msgs.next().await {
let server_uid = msg.uid.unwrap_or_default();
if !server_uids.contains(&server_uid) {
// skip if there are some in between we are not interested in
continue;
}
count += 1;
// XXX put flags into a set and pass them to dc_receive_imf
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
if !is_deleted && msg.body().is_some() {
let body = msg.body().unwrap_or_default();
if let Err(err) =
dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen).await
{
return Err(Error::Other(format!("dc_receive_imf error: {}", err)));
let folder = folder.as_ref().to_string();
let context = context.clone();
let task = async_std::task::spawn(async move {
let body = msg.body().unwrap_or_default();
if let Err(err) =
dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await
{
warn!(context, "dc_receive_imf error: {}", err);
read_errors += 1;
None
} else {
Some(server_uid)
}
});
jobs.push(task);
}
}
for task in futures::future::join_all(jobs).await {
match task {
Some(uid) => {
last_uid = Some(uid);
}
None => {
read_errors += 1;
}
}
} else {
}
if count != server_uids.len() {
warn!(
context,
"Message #{} does not exist in folder \"{}\".",
server_uid,
folder.as_ref()
"failed to fetch all uids: got {}, requested {}",
count,
server_uids.len()
);
}
Ok(())
(last_uid, read_errors)
}
pub async fn can_move(&self) -> bool {