diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 487c9b1c3..34f2e63ba 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -52,11 +52,10 @@ class Account(object): if self._dc_context == ffi.NULL: raise ValueError("Could not dc_context_new: {} {}".format(os_name, db_path)) - hook = hookspec.Global._get_plugin_manager().hook - self._shutdown_event = Event() self._event_thread = EventThread(self) self._configkeys = self.get_config("sys.config_keys").split() + hook = hookspec.Global._get_plugin_manager().hook hook.dc_account_init(account=self) def disable_logging(self): diff --git a/python/src/deltachat/testplugin.py b/python/src/deltachat/testplugin.py index a785d9cf0..8a1939b08 100644 --- a/python/src/deltachat/testplugin.py +++ b/python/src/deltachat/testplugin.py @@ -15,7 +15,6 @@ import requests from . import Account, const from .capi import lib from .events import FFIEventLogger, FFIEventTracker -from _pytest.monkeypatch import MonkeyPatch from _pytest._code import Source import deltachat @@ -101,17 +100,16 @@ def pytest_report_header(config, startdir): summary = [] t = tempfile.mktemp() - m = MonkeyPatch() try: ac = Account(t) info = ac.get_info() ac.shutdown() finally: - m.undo() os.remove(t) - summary.extend(['Deltachat core={} sqlite={}'.format( + summary.extend(['Deltachat core={} sqlite={} journal_mode={}'.format( info['deltachat_core_version'], info['sqlite_version'], + info['journal_mode'], )]) cfg = config.option.liveconfig @@ -232,6 +230,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): def make_account(self, path, logid, quiet=False): ac = Account(path, logging=self._logging) ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac)) + ac.addr = ac.get_self_contact().addr if not quiet: ac.add_account_plugin(FFIEventLogger(ac, logid=logid)) self._accounts.append(ac) @@ -321,6 +320,14 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): ac2.start_io() return ac1, ac2 + def get_many_online_accounts(self, num, move=True, quiet=True): + accounts = [self.get_online_configuring_account(move=move, quiet=quiet) + for i in range(num)] + for acc in accounts: + acc._configtracker.wait_finish() + acc.start_io() + return accounts + def clone_online_account(self, account, pre_generated_key=True): self.live_count += 1 tmpdb = tmpdir.join("livedb%d" % self.live_count) diff --git a/python/tests/stress_test_db.py b/python/tests/stress_test_db.py new file mode 100644 index 000000000..ab5813218 --- /dev/null +++ b/python/tests/stress_test_db.py @@ -0,0 +1,135 @@ + +import time +import threading +import pytest +import os +from queue import Queue, Empty + +import deltachat + + +def test_db_busy_error(acfactory, tmpdir): + starttime = time.time() + log_lock = threading.RLock() + + def log(string): + with log_lock: + print("%3.2f %s" % (time.time() - starttime, string)) + + # make a number of accounts + accounts = acfactory.get_many_online_accounts(3, quiet=True) + log("created %s accounts" % len(accounts)) + + # put a bigfile into each account + for acc in accounts: + acc.bigfile = os.path.join(acc.get_blobdir(), "bigfile") + with open(acc.bigfile, "wb") as f: + f.write(b"01234567890"*1000_000) + log("created %s bigfiles" % len(accounts)) + + contact_addrs = [acc.get_self_contact().addr for acc in accounts] + chat = accounts[0].create_group_chat("stress-group") + for addr in contact_addrs[1:]: + chat.add_contact(chat.account.create_contact(addr)) + + # setup auto-responder bots which report back failures/actions + report_queue = Queue() + + def report_func(replier, report_type, *report_args): + report_queue.put((replier, report_type, report_args)) + + # each replier receives all events and sends report events to receive_queue + repliers = [] + for acc in accounts: + replier = AutoReplier(acc, log=log, num_send=500, num_bigfiles=5, report_func=report_func) + acc.add_account_plugin(replier) + repliers.append(replier) + + # kick off message sending + # after which repliers will reply to each other + chat.send_text("hello") + + alive_count = len(accounts) + while alive_count > 0: + try: + replier, report_type, report_args = report_queue.get(timeout=10) + except Empty: + log("timeout waiting for next event") + pytest.fail("timeout exceeded") + if report_type == ReportType.exit: + replier.log("EXIT".format(alive_count)) + elif report_type == ReportType.ffi_error: + replier.log("ERROR: {}".format(addr, report_args[0])) + elif report_type == ReportType.message_echo: + continue + else: + raise ValueError("{} unknown report type {}, args={}".format( + addr, report_type, report_args + )) + alive_count -= 1 + replier.log("shutting down") + replier.account.shutdown() + replier.log("shut down complete, remaining={}".format(alive_count)) + + +class ReportType: + exit = "exit" + ffi_error = "ffi-error" + message_echo = "message-echo" + + +class AutoReplier: + def __init__(self, account, log, num_send, num_bigfiles, report_func): + self.account = account + self._log = log + self.report_func = report_func + self.num_send = num_send + self.num_bigfiles = num_bigfiles + self.current_sent = 0 + self.addr = self.account.get_self_contact().addr + + self._thread = threading.Thread( + name="Stats{}".format(self.account), + target=self.thread_stats + ) + self._thread.setDaemon(True) + self._thread.start() + + def log(self, message): + self._log("{} {}".format(self.addr, message)) + + def thread_stats(self): + # XXX later use, for now we just quit + return + while 1: + time.sleep(1.0) + break + + @deltachat.account_hookimpl + def ac_incoming_message(self, message): + if self.current_sent >= self.num_send: + self.report_func(self, ReportType.exit) + return + message.accept_sender_contact() + message.mark_seen() + self.log("incoming message: {}".format(message)) + + self.current_sent += 1 + # we are still alive, let's send a reply + if self.num_bigfiles and self.current_sent % (self.num_send / self.num_bigfiles) == 0: + message.chat.send_text("send big file as reply to: {}".format(message.text)) + msg = message.chat.send_file(self.account.bigfile) + else: + msg = message.chat.send_text("got message id {}, small text reply".format(message.id)) + assert msg.text + self.log("message-sent: {}".format(msg)) + self.report_func(self, ReportType.message_echo) + if self.current_sent >= self.num_send: + self.report_func(self, ReportType.exit) + return + + @deltachat.account_hookimpl + def ac_process_ffi_event(self, ffi_event): + self.log(ffi_event) + if ffi_event.name == "DC_EVENT_ERROR": + self.report_func(self, ReportType.ffi_error, ffi_event)