From 508b985249a772084cd6f5beeda98902d7fe47be Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sat, 16 May 2020 20:31:09 +0200 Subject: [PATCH 1/6] first working stress test --- python/src/deltachat/testplugin.py | 8 +++ python/tests/stress_test_db.py | 85 ++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 python/tests/stress_test_db.py diff --git a/python/src/deltachat/testplugin.py b/python/src/deltachat/testplugin.py index 7208e8280..26a57c409 100644 --- a/python/src/deltachat/testplugin.py +++ b/python/src/deltachat/testplugin.py @@ -232,6 +232,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): ac = Account(path, logging=self._logging) ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac)) ac._configtracker = ac.add_account_plugin(ConfigureTracker()) + ac.addr = ac.get_self_contact().addr if not quiet: ac.add_account_plugin(FFIEventLogger(ac, logid=logid)) self._accounts.append(ac) @@ -320,6 +321,13 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): ac2._configtracker.wait_finish() return ac1, ac2 + def get_many_online_accounts(self, num, move=True, quiet=True): + accounts = [self.get_online_configuring_account(move=move, quiet=quiet) + for i in range(num)] + for acc in accounts: + acc._configtracker.wait_finish() + return accounts + def clone_online_account(self, account, pre_generated_key=True): self.live_count += 1 tmpdb = tmpdir.join("livedb%d" % self.live_count) diff --git a/python/tests/stress_test_db.py b/python/tests/stress_test_db.py new file mode 100644 index 000000000..f38c5af16 --- /dev/null +++ b/python/tests/stress_test_db.py @@ -0,0 +1,85 @@ +import random +from queue import Queue + +import deltachat + + +def test_db_busy_error(acfactory): + # make a number of accounts and put them in one chat + accounts = acfactory.get_many_online_accounts(4) + contact_addrs = [acc.get_self_contact().addr for acc in accounts] + chat = accounts[0].create_group_chat("stress-group") + for addr in contact_addrs[1:]: + chat.add_contact(chat.account.create_contact(addr)) + + # setup auto-responder bots which report back failures/actions + report_queue = Queue() + + def report_func(replier, report_type, *report_args): + report_queue.put((replier, report_type, report_args)) + + # each replier receives all events and sends report events to receive_queue + repliers = [] + for acc in accounts: + replier = AutoReplier(acc, exit_probability=0.05, report_func=report_func) + acc.add_account_plugin(replier) + repliers.append(replier) + + # kick off message sending + # after which repliers will reply to each other + chat.send_text("hello") + + alive_count = len(accounts) + while alive_count > 0: + replier, report_type, report_args = report_queue.get(10) + addr = replier.account.get_self_contact().addr + assert addr + if report_type == ReportType.exit: + alive_count -= 1 + print("{} EXIT -- remaining: {}".format(addr, alive_count)) + replier.account.shutdown(wait=True) + elif report_type == ReportType.message_sent: + print("{} sent message id={}".format(addr, report_args[0].id)) + elif report_type == ReportType.message_incoming: + print("{} incoming message id={}".format(addr, report_args[0].id)) + elif report_type == ReportType.ffi_error: + print("{} ERROR: {}".format(addr, report_args[0].id)) + replier.account.shutdown(wait=True) + alive_count -= 1 + + +class ReportType: + exit = "exit" + message_sent = "message-sent" + ffi_error = "ffi-error" + message_incoming = "message-incoming" + + +class AutoReplier: + def __init__(self, account, exit_probability, report_func): + assert 0 < exit_probability < 1 + self.account = account + self.report_func = report_func + self.exit_probability = exit_probability + self.exiting = False + + @deltachat.account_hookimpl + def ac_incoming_message(self, message): + if self.exiting: + return + message.accept_sender_contact() + message.mark_seen() + self.report_func(self, ReportType.message_incoming, message) + if random.random() <= self.exit_probability: + self.exiting = True + self.report_func(self, ReportType.exit) + return + + # we are still alive, let's send a reply + msg = message.chat.send_text("hello, got message id {}".format(message.id)) + self.report_func(self, ReportType.message_sent, msg) + + @deltachat.account_hookimpl + def ac_process_ffi_event(self, ffi_event): + if ffi_event.name == "DC_EVENT_ERROR": + self.report_func(self, ReportType.ffi_error, ffi_event) From 9c02f6db6ec87729b1f36f360c4dbb72a074c11c Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sat, 16 May 2020 21:53:37 +0200 Subject: [PATCH 2/6] some more mesh --- python/tests/stress_test_db.py | 42 ++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/python/tests/stress_test_db.py b/python/tests/stress_test_db.py index f38c5af16..6a899f6be 100644 --- a/python/tests/stress_test_db.py +++ b/python/tests/stress_test_db.py @@ -1,12 +1,28 @@ +import time +import os import random from queue import Queue import deltachat -def test_db_busy_error(acfactory): - # make a number of accounts and put them in one chat - accounts = acfactory.get_many_online_accounts(4) +def test_db_busy_error(acfactory, tmpdir): + starttime = time.time() + + def log(string): + print("%3.2f %s" % (time.time() - starttime, string)) + + # make a number of accounts + accounts = acfactory.get_many_online_accounts(5, quiet=True) + log("created %s accounts" % len(accounts)) + + # put a bigfile into each account + for acc in accounts: + acc.bigfile = os.path.join(acc.get_blobdir(), "bigfile") + with open(acc.bigfile, "wb") as f: + f.write(b"01234567890"*1000_000) + log("created %s bigfiles" % len(accounts)) + contact_addrs = [acc.get_self_contact().addr for acc in accounts] chat = accounts[0].create_group_chat("stress-group") for addr in contact_addrs[1:]: @@ -21,7 +37,7 @@ def test_db_busy_error(acfactory): # each replier receives all events and sends report events to receive_queue repliers = [] for acc in accounts: - replier = AutoReplier(acc, exit_probability=0.05, report_func=report_func) + replier = AutoReplier(acc, big_probability=0.1, exit_probability=0.01, report_func=report_func) acc.add_account_plugin(replier) repliers.append(replier) @@ -36,14 +52,14 @@ def test_db_busy_error(acfactory): assert addr if report_type == ReportType.exit: alive_count -= 1 - print("{} EXIT -- remaining: {}".format(addr, alive_count)) + log("{} EXIT -- remaining: {}".format(addr, alive_count)) replier.account.shutdown(wait=True) elif report_type == ReportType.message_sent: - print("{} sent message id={}".format(addr, report_args[0].id)) + log("{} sent message: {}".format(addr, report_args[0].text)) elif report_type == ReportType.message_incoming: - print("{} incoming message id={}".format(addr, report_args[0].id)) + log("{} incoming message: {}".format(addr, report_args[0].text)) elif report_type == ReportType.ffi_error: - print("{} ERROR: {}".format(addr, report_args[0].id)) + log("{} ERROR: {}".format(addr, report_args[0])) replier.account.shutdown(wait=True) alive_count -= 1 @@ -56,11 +72,12 @@ class ReportType: class AutoReplier: - def __init__(self, account, exit_probability, report_func): + def __init__(self, account, big_probability, exit_probability, report_func): assert 0 < exit_probability < 1 self.account = account self.report_func = report_func self.exit_probability = exit_probability + self.big_probability = big_probability self.exiting = False @deltachat.account_hookimpl @@ -76,7 +93,12 @@ class AutoReplier: return # we are still alive, let's send a reply - msg = message.chat.send_text("hello, got message id {}".format(message.id)) + if random.random() <= self.big_probability: + message.chat.send_text("send big file as reply to: {}".format(message.text)) + msg = message.chat.send_file(self.account.bigfile) + else: + msg = message.chat.send_text("got message id {}, small text reply".format(message.id)) + assert msg.text self.report_func(self, ReportType.message_sent, msg) @deltachat.account_hookimpl From b0dbb28422297fb2103ca2cfa7375511e03beca2 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 18 May 2020 08:43:48 +0200 Subject: [PATCH 3/6] less randomness --- python/tests/stress_test_db.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/python/tests/stress_test_db.py b/python/tests/stress_test_db.py index 6a899f6be..258cf07ea 100644 --- a/python/tests/stress_test_db.py +++ b/python/tests/stress_test_db.py @@ -37,7 +37,7 @@ def test_db_busy_error(acfactory, tmpdir): # each replier receives all events and sends report events to receive_queue repliers = [] for acc in accounts: - replier = AutoReplier(acc, big_probability=0.1, exit_probability=0.01, report_func=report_func) + replier = AutoReplier(acc, num_send=1000, big_probability=1.0/300, report_func=report_func) acc.add_account_plugin(replier) repliers.append(replier) @@ -72,25 +72,20 @@ class ReportType: class AutoReplier: - def __init__(self, account, big_probability, exit_probability, report_func): - assert 0 < exit_probability < 1 + def __init__(self, account, report_func, num_send, big_probability): + assert 0 < big_probability < 1 self.account = account self.report_func = report_func - self.exit_probability = exit_probability + self.num_remaining = num_send self.big_probability = big_probability - self.exiting = False @deltachat.account_hookimpl def ac_incoming_message(self, message): - if self.exiting: + if self.num_remaining == 0: return message.accept_sender_contact() message.mark_seen() self.report_func(self, ReportType.message_incoming, message) - if random.random() <= self.exit_probability: - self.exiting = True - self.report_func(self, ReportType.exit) - return # we are still alive, let's send a reply if random.random() <= self.big_probability: @@ -100,6 +95,10 @@ class AutoReplier: msg = message.chat.send_text("got message id {}, small text reply".format(message.id)) assert msg.text self.report_func(self, ReportType.message_sent, msg) + self.num_remaining -= 1 + if self.num_remaining == 0: + self.report_func(self, ReportType.exit) + return @deltachat.account_hookimpl def ac_process_ffi_event(self, ffi_event): From c1593c5c535ac24019e364f624adf1b3972a7738 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 18 May 2020 09:02:33 +0200 Subject: [PATCH 4/6] sipmlify, no randomness anymore --- python/tests/stress_test_db.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/python/tests/stress_test_db.py b/python/tests/stress_test_db.py index 258cf07ea..2f84f7624 100644 --- a/python/tests/stress_test_db.py +++ b/python/tests/stress_test_db.py @@ -1,6 +1,5 @@ import time import os -import random from queue import Queue import deltachat @@ -13,7 +12,7 @@ def test_db_busy_error(acfactory, tmpdir): print("%3.2f %s" % (time.time() - starttime, string)) # make a number of accounts - accounts = acfactory.get_many_online_accounts(5, quiet=True) + accounts = acfactory.get_many_online_accounts(5, quiet=False) log("created %s accounts" % len(accounts)) # put a bigfile into each account @@ -37,7 +36,7 @@ def test_db_busy_error(acfactory, tmpdir): # each replier receives all events and sends report events to receive_queue repliers = [] for acc in accounts: - replier = AutoReplier(acc, num_send=1000, big_probability=1.0/300, report_func=report_func) + replier = AutoReplier(acc, num_send=1000, num_bigfiles=0, report_func=report_func) acc.add_account_plugin(replier) repliers.append(replier) @@ -72,31 +71,31 @@ class ReportType: class AutoReplier: - def __init__(self, account, report_func, num_send, big_probability): - assert 0 < big_probability < 1 + def __init__(self, account, report_func, num_send, num_bigfiles): self.account = account self.report_func = report_func - self.num_remaining = num_send - self.big_probability = big_probability + self.num_send = num_send + self.num_bigfiles = num_bigfiles + self.current_sent = 0 @deltachat.account_hookimpl def ac_incoming_message(self, message): - if self.num_remaining == 0: + if self.current_sent >= self.num_send: return message.accept_sender_contact() message.mark_seen() self.report_func(self, ReportType.message_incoming, message) + self.current_sent += 1 # we are still alive, let's send a reply - if random.random() <= self.big_probability: + if self.num_bigfiles and self.current_sent % self.num_bigfiles == 0: message.chat.send_text("send big file as reply to: {}".format(message.text)) msg = message.chat.send_file(self.account.bigfile) else: msg = message.chat.send_text("got message id {}, small text reply".format(message.id)) assert msg.text self.report_func(self, ReportType.message_sent, msg) - self.num_remaining -= 1 - if self.num_remaining == 0: + if self.current_sent >= self.num_send: self.report_func(self, ReportType.exit) return From 452c9225dc1b28467c219a54754206b88cf68e1e Mon Sep 17 00:00:00 2001 From: holger krekel Date: Mon, 18 May 2020 16:22:13 +0200 Subject: [PATCH 5/6] snap --- python/src/deltachat/account.py | 3 +- python/src/deltachat/testplugin.py | 8 +--- python/tests/stress_test_db.py | 73 +++++++++++++++++++++--------- 3 files changed, 54 insertions(+), 30 deletions(-) diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index d1b4ac519..e8243cdc4 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -46,7 +46,6 @@ class Account(object): lib.dc_context_new(lib.py_dc_callback, ffi.NULL, as_dc_charpointer(os_name)), _destroy_dc_context, ) - hook = hookspec.Global._get_plugin_manager().hook self._threads = iothreads.IOThreads(self) @@ -55,9 +54,9 @@ class Account(object): self._shutdown_event = Event() # open database - self.db_path = db_path if hasattr(db_path, "encode"): db_path = db_path.encode("utf8") + self.db_path = db_path if not lib.dc_open(self._dc_context, db_path, ffi.NULL): raise ValueError("Could not dc_open: {}".format(db_path)) self._configkeys = self.get_config("sys.config_keys").split() diff --git a/python/src/deltachat/testplugin.py b/python/src/deltachat/testplugin.py index 26a57c409..2e575b79a 100644 --- a/python/src/deltachat/testplugin.py +++ b/python/src/deltachat/testplugin.py @@ -16,7 +16,6 @@ from . import Account, const from .tracker import ConfigureTracker from .capi import lib from .eventlogger import FFIEventLogger, FFIEventTracker -from _pytest.monkeypatch import MonkeyPatch from _pytest._code import Source import deltachat @@ -99,14 +98,11 @@ def pytest_report_header(config, startdir): summary = [] t = tempfile.mktemp() - m = MonkeyPatch() try: - m.setattr(sys.stdout, "write", lambda x: len(x)) - ac = Account(t) + ac = Account(t, logging=True) info = ac.get_info() - ac.shutdown() + ac.shutdown(False) finally: - m.undo() os.remove(t) summary.extend(['Deltachat core={} sqlite={}'.format( info['deltachat_core_version'], diff --git a/python/tests/stress_test_db.py b/python/tests/stress_test_db.py index 2f84f7624..ab5cc0fd6 100644 --- a/python/tests/stress_test_db.py +++ b/python/tests/stress_test_db.py @@ -1,18 +1,23 @@ + import time +import threading +import pytest import os -from queue import Queue +from queue import Queue, Empty import deltachat def test_db_busy_error(acfactory, tmpdir): starttime = time.time() + log_lock = threading.RLock() def log(string): - print("%3.2f %s" % (time.time() - starttime, string)) + with log_lock: + print("%3.2f %s" % (time.time() - starttime, string)) # make a number of accounts - accounts = acfactory.get_many_online_accounts(5, quiet=False) + accounts = acfactory.get_many_online_accounts(5, quiet=True) log("created %s accounts" % len(accounts)) # put a bigfile into each account @@ -36,7 +41,7 @@ def test_db_busy_error(acfactory, tmpdir): # each replier receives all events and sends report events to receive_queue repliers = [] for acc in accounts: - replier = AutoReplier(acc, num_send=1000, num_bigfiles=0, report_func=report_func) + replier = AutoReplier(acc, log=log, num_send=1000, num_bigfiles=0, report_func=report_func) acc.add_account_plugin(replier) repliers.append(replier) @@ -46,37 +51,59 @@ def test_db_busy_error(acfactory, tmpdir): alive_count = len(accounts) while alive_count > 0: - replier, report_type, report_args = report_queue.get(10) - addr = replier.account.get_self_contact().addr - assert addr + try: + replier, report_type, report_args = report_queue.get(timeout=10) + except Empty: + log("timeout waiting for next event") + pytest.fail("timeout exceeded") if report_type == ReportType.exit: - alive_count -= 1 - log("{} EXIT -- remaining: {}".format(addr, alive_count)) - replier.account.shutdown(wait=True) - elif report_type == ReportType.message_sent: - log("{} sent message: {}".format(addr, report_args[0].text)) - elif report_type == ReportType.message_incoming: - log("{} incoming message: {}".format(addr, report_args[0].text)) + replier.log("EXIT".format(alive_count)) elif report_type == ReportType.ffi_error: - log("{} ERROR: {}".format(addr, report_args[0])) - replier.account.shutdown(wait=True) - alive_count -= 1 + replier.log("ERROR: {}".format(addr, report_args[0])) + elif report_type == ReportType.message_echo: + continue + else: + raise ValueError("{} unknown report type {}, args={}".format( + addr, report_type, report_args + )) + alive_count -= 1 + replier.log("shutting down") + replier.account.shutdown(wait=False) + replier.log("shut down complete, remaining={}".format(alive_count)) class ReportType: exit = "exit" - message_sent = "message-sent" ffi_error = "ffi-error" - message_incoming = "message-incoming" + message_echo = "message-echo" class AutoReplier: - def __init__(self, account, report_func, num_send, num_bigfiles): + def __init__(self, account, log, num_send, num_bigfiles, report_func): self.account = account + self._log = log self.report_func = report_func self.num_send = num_send self.num_bigfiles = num_bigfiles self.current_sent = 0 + self.addr = self.account.get_self_contact().addr + + self._thread = threading.Thread( + name="Stats{}".format(self.account), + target=self.thread_stats + ) + self._thread.setDaemon(True) + self._thread.start() + + def log(self, message): + self._log("{} {}".format(self.addr, message)) + + def thread_stats(self): + # XXX later use, for now we just quit + return + while 1: + time.sleep(1.0) + break @deltachat.account_hookimpl def ac_incoming_message(self, message): @@ -84,7 +111,7 @@ class AutoReplier: return message.accept_sender_contact() message.mark_seen() - self.report_func(self, ReportType.message_incoming, message) + self.log("incoming message: {}".format(message)) self.current_sent += 1 # we are still alive, let's send a reply @@ -94,12 +121,14 @@ class AutoReplier: else: msg = message.chat.send_text("got message id {}, small text reply".format(message.id)) assert msg.text - self.report_func(self, ReportType.message_sent, msg) + self.log("message-sent: {}".format(msg)) + self.report_func(self, ReportType.message_echo) if self.current_sent >= self.num_send: self.report_func(self, ReportType.exit) return @deltachat.account_hookimpl def ac_process_ffi_event(self, ffi_event): + self.log(ffi_event) if ffi_event.name == "DC_EVENT_ERROR": self.report_func(self, ReportType.ffi_error, ffi_event) From 71442db39f5c062b457c58f3e435c7ac80d61fbc Mon Sep 17 00:00:00 2001 From: holger krekel Date: Tue, 19 May 2020 14:49:16 +0200 Subject: [PATCH 6/6] snap --- Cargo.toml | 2 +- python/src/deltachat/testplugin.py | 3 ++- src/sql.rs | 4 +++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 683fd39ff..6af077194 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" license = "MPL-2.0" [profile.release] -lto = true +# lto = true [dependencies] deltachat_derive = { path = "./deltachat_derive" } diff --git a/python/src/deltachat/testplugin.py b/python/src/deltachat/testplugin.py index 2e575b79a..02ed54309 100644 --- a/python/src/deltachat/testplugin.py +++ b/python/src/deltachat/testplugin.py @@ -104,9 +104,10 @@ def pytest_report_header(config, startdir): ac.shutdown(False) finally: os.remove(t) - summary.extend(['Deltachat core={} sqlite={}'.format( + summary.extend(['Deltachat core={} sqlite={} journal_mode={}'.format( info['deltachat_core_version'], info['sqlite_version'], + info['journal_mode'], )]) cfg = config.option.liveconfig diff --git a/src/sql.rs b/src/sql.rs index c5927aa04..e65d5a470 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -422,7 +422,9 @@ fn open( // but even if execute() would handle errors more gracefully, we should continue on errors - // systems might not be able to handle WAL, in which case the standard-journal is used. // that may be not optimal, but better than not working at all :) - sql.execute("PRAGMA journal_mode=WAL;", NO_PARAMS).ok(); + if std::env::var("DCC_WAL").is_ok() { + sql.execute("PRAGMA journal_mode=WAL;", NO_PARAMS).ok(); + } let mut exists_before_update = false; let mut dbversion_before_update = 0;