From f67c86cb393fcac0cd7efdb299fa19d12cf5937f Mon Sep 17 00:00:00 2001 From: holger krekel Date: Wed, 20 May 2020 16:32:12 +0200 Subject: [PATCH] refactor callback thread handling --- python/src/deltachat/__init__.py | 4 +- python/src/deltachat/account.py | 107 +++----------------------- python/src/deltachat/eventlogger.py | 114 +++++++++++++++++++++++++--- python/src/deltachat/iothreads.py | 64 ---------------- python/tests/conftest.py | 1 + python/tests/test_account.py | 1 - python/tests/test_lowlevel.py | 53 +------------ 7 files changed, 123 insertions(+), 221 deletions(-) delete mode 100644 python/src/deltachat/iothreads.py diff --git a/python/src/deltachat/__init__.py b/python/src/deltachat/__init__.py index fbd20283d..1f88100f2 100644 --- a/python/src/deltachat/__init__.py +++ b/python/src/deltachat/__init__.py @@ -1,7 +1,7 @@ import sys -from . import capi, const, hookspec -from .capi import ffi +from . import capi, const, hookspec # noqa +from .capi import ffi # noqa from .account import Account # noqa from .message import Message # noqa from .contact import Contact # noqa diff --git a/python/src/deltachat/account.py b/python/src/deltachat/account.py index 44b9cac22..086955157 100644 --- a/python/src/deltachat/account.py +++ b/python/src/deltachat/account.py @@ -4,20 +4,19 @@ from __future__ import print_function import atexit from contextlib import contextmanager from email.utils import parseaddr -import queue from threading import Event import os from array import array -import deltachat from . import const from .capi import ffi, lib from .cutil import as_dc_charpointer, from_dc_charpointer, iter_array, DCLot from .chat import Chat -from .message import Message, map_system_message +from .message import Message from .contact import Contact from .tracker import ImexTracker, ConfigureTracker -from . import hookspec, iothreads -from .eventlogger import FFIEvent, FFIEventLogger +from . import hookspec +from .eventlogger import FFIEventLogger, CallbackThread + class MissingCredentials(ValueError): """ Account is missing `addr` and `mail_pw` config values. """ @@ -52,8 +51,6 @@ class Account(object): hook = hookspec.Global._get_plugin_manager().hook - self._threads = iothreads.IOThreads(self) - self._in_use_iter_events = False self._shutdown_event = Event() # open database @@ -62,7 +59,7 @@ class Account(object): db_path = db_path.encode("utf8") if not lib.dc_open(self._dc_context, db_path, ffi.NULL): raise ValueError("Could not dc_open: {}".format(db_path)) - self._threads.start() + self._cb_thread = CallbackThread(self) self._configkeys = self.get_config("sys.config_keys").split() atexit.register(self.shutdown) hook.dc_account_init(account=self) @@ -465,8 +462,8 @@ class Account(object): If sending out was unsuccessful, a RuntimeError is raised. """ self.check_is_configured() - if not self._threads.is_started() or not self.is_started(): - raise RuntimeError("threads not running, can not send out") + if not self._cb_thread.is_alive() or not self.is_started(): + raise RuntimeError("IO not running, can not send out") res = lib.dc_initiate_key_transfer(self._dc_context) if res == ffi.NULL: raise RuntimeError("could not send out autocrypt setup message") @@ -591,6 +588,7 @@ class Account(object): def stop_scheduler(self): """ stop core scheduler if it is running. """ self.ac_log_line("context_shutdown (stop core scheduler)") + self.stop_ongoing() lib.dc_context_shutdown(self._dc_context) def shutdown(self, wait=True): @@ -600,90 +598,23 @@ class Account(object): if dc_context is None: return - self.stop_ongoing() - if self._threads.is_started(): + if self._cb_thread.is_alive(): self.ac_log_line("stop threads") - self._threads.stop(wait=False) + self._cb_thread.stop(wait=False) self.stop_scheduler() self.ac_log_line("dc_close") lib.dc_close(dc_context) self.ac_log_line("wait threads for real") - self._threads.stop(wait=wait) # to wait for threads + if wait: + self._cb_thread.stop(wait=wait) self._dc_context = None atexit.unregister(self.shutdown) self._shutdown_event.set() hook = hookspec.Global._get_plugin_manager().hook hook.dc_account_after_shutdown(account=self, dc_context=dc_context) - def iter_events(self, timeout=None): - """ yield hook events until shutdown. - - It is not allowed to call iter_events() from multiple threads. - """ - if self._in_use_iter_events: - raise RuntimeError("can only call iter_events() from one thread") - self._in_use_iter_events = True - while lib.dc_is_open(self._dc_context): - self.ac_log_line("waiting for event") - event = lib.dc_get_next_event(self._dc_context) - if event == ffi.NULL: - break - self.ac_log_line("got event {}".format(event)) - - evt = lib.dc_event_get_id(event) - data1 = lib.dc_event_get_data1(event) - data2 = lib.dc_event_get_data2(event) - # the following code relates to the deltachat/_build.py's helper - # function which provides us signature info of an event call - evt_name = deltachat.get_dc_event_name(evt) - event_sig_types = lib.dc_get_event_signature_types(evt) - if data1 and event_sig_types & 1: - data1 = ffi.string(ffi.gc(ffi.cast('char*', data1), lib.dc_str_unref)).decode("utf8") - if data2 and event_sig_types & 2: - data2 = ffi.string(ffi.gc(ffi.cast('char*', data2), lib.dc_str_unref)).decode("utf8") - try: - if isinstance(data2, bytes): - data2 = data2.decode("utf8") - except UnicodeDecodeError: - # XXX ignoring the decode error is not quite correct but for now - # i don't want to hunt down encoding problems in the c lib - pass - - lib.dc_event_unref(event) - ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2) - self._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event) - for name, kwargs in self._map_ffi_event(ffi_event): - yield HookEvent(self, name=name, kwargs=kwargs) - - def _map_ffi_event(self, ffi_event): - name = ffi_event.name - if name == "DC_EVENT_CONFIGURE_PROGRESS": - data1 = ffi_event.data1 - if data1 == 0 or data1 == 1000: - success = data1 == 1000 - yield "ac_configure_completed", dict(success=success) - elif name == "DC_EVENT_INCOMING_MSG": - msg = self.get_message_by_id(ffi_event.data2) - yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg)) - elif name == "DC_EVENT_MSGS_CHANGED": - if ffi_event.data2 != 0: - msg = self.get_message_by_id(ffi_event.data2) - if msg.is_outgoing(): - res = map_system_message(msg) - if res and res[0].startswith("ac_member"): - yield res - yield "ac_outgoing_message", dict(message=msg) - elif msg.is_in_fresh(): - yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg)) - elif name == "DC_EVENT_MSG_DELIVERED": - msg = self.get_message_by_id(ffi_event.data2) - yield "ac_message_delivered", dict(message=msg) - elif name == "DC_EVENT_CHAT_MODIFIED": - chat = self.get_chat_by_id(ffi_event.data1) - yield "ac_chat_modified", dict(chat=chat) - def _destroy_dc_context(dc_context, dc_context_unref=lib.dc_context_unref): # destructor for dc_context @@ -703,17 +634,3 @@ class ScannedQRCode: @property def contact_id(self): return self._dc_lot.id() - - -class HookEvent: - def __init__(self, account, name, kwargs): - assert hasattr(account._pm.hook, name), name - self.account = account - self.name = name - self.kwargs = kwargs - - def call_hook(self): - hook = getattr(self.account._pm.hook, self.name, None) - if hook is None: - raise ValueError("event_name {} unknown".format(self.name)) - return hook(**self.kwargs) diff --git a/python/src/deltachat/eventlogger.py b/python/src/deltachat/eventlogger.py index e225843e6..4a7f7e542 100644 --- a/python/src/deltachat/eventlogger.py +++ b/python/src/deltachat/eventlogger.py @@ -1,18 +1,13 @@ -import deltachat import threading import time import re from queue import Queue, Empty -from .hookspec import account_hookimpl, global_hookimpl - - -# @global_hookimpl -# def dc_account_init(account): - # account._threads.start() - -# @global_hookimpl -# def dc_account_after_shutdown(dc_context): +import deltachat +from .hookspec import account_hookimpl +from contextlib import contextmanager +from .capi import ffi, lib +from .message import map_system_message class FFIEvent: @@ -127,3 +122,102 @@ class FFIEventTracker: ev = self.get_matching("DC_EVENT_MSGS_CHANGED") if ev.data2 > 0: return self.account.get_message_by_id(ev.data2) + + +class CallbackThread(threading.Thread): + """ Callback Thread for an account. + + With each Account init this callback thread is started. + """ + def __init__(self, account): + self.account = account + self._dc_context = account._dc_context + self._thread_quitflag = False + super(CallbackThread, self).__init__(name="callback") + self.start() + + @contextmanager + def log_execution(self, message): + self.account.ac_log_line(message + " START") + yield + self.account.ac_log_line(message + " FINISHED") + + def stop(self, wait=False): + self._thread_quitflag = True + + if wait: + self.join() + + def run(self): + """ get and run events until shutdown. """ + with self.log_execution("CALLBACK THREAD START"): + self._inner_run() + + def _inner_run(self): + while lib.dc_is_open(self._dc_context) and not self._thread_quitflag: + self.account.ac_log_line("waiting for event") + event = lib.dc_get_next_event(self._dc_context) + if event == ffi.NULL: + break + self.account.ac_log_line("got event {}".format(event)) + + evt = lib.dc_event_get_id(event) + data1 = lib.dc_event_get_data1(event) + data2 = lib.dc_event_get_data2(event) + # the following code relates to the deltachat/_build.py's helper + # function which provides us signature info of an event call + evt_name = deltachat.get_dc_event_name(evt) + event_sig_types = lib.dc_get_event_signature_types(evt) + if data1 and event_sig_types & 1: + data1 = ffi.string(ffi.gc(ffi.cast('char*', data1), lib.dc_str_unref)).decode("utf8") + if data2 and event_sig_types & 2: + data2 = ffi.string(ffi.gc(ffi.cast('char*', data2), lib.dc_str_unref)).decode("utf8") + try: + if isinstance(data2, bytes): + data2 = data2.decode("utf8") + except UnicodeDecodeError: + # XXX ignoring the decode error is not quite correct but for now + # i don't want to hunt down encoding problems in the c lib + pass + + lib.dc_event_unref(event) + ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2) + self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event) + for name, kwargs in self._map_ffi_event(ffi_event): + self.account.ac_log_line("calling hook name={} kwargs={}".format(name, kwargs)) + hook = getattr(self.account._pm.hook, name) + try: + hook(**kwargs) + except Exception: + # don't bother logging this error + # if dc_close() was concurrently called + # (note: core API starts failing after that) + if not self._thread_quitflag: + raise + + def _map_ffi_event(self, ffi_event): + name = ffi_event.name + if name == "DC_EVENT_CONFIGURE_PROGRESS": + data1 = ffi_event.data1 + if data1 == 0 or data1 == 1000: + success = data1 == 1000 + yield "ac_configure_completed", dict(success=success) + elif name == "DC_EVENT_INCOMING_MSG": + msg = self.get_message_by_id(ffi_event.data2) + yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg)) + elif name == "DC_EVENT_MSGS_CHANGED": + if ffi_event.data2 != 0: + msg = self.account.get_message_by_id(ffi_event.data2) + if msg.is_outgoing(): + res = map_system_message(msg) + if res and res[0].startswith("ac_member"): + yield res + yield "ac_outgoing_message", dict(message=msg) + elif msg.is_in_fresh(): + yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg)) + elif name == "DC_EVENT_MSG_DELIVERED": + msg = self.account.get_message_by_id(ffi_event.data2) + yield "ac_message_delivered", dict(message=msg) + elif name == "DC_EVENT_CHAT_MODIFIED": + chat = self.account.get_chat_by_id(ffi_event.data1) + yield "ac_chat_modified", dict(chat=chat) diff --git a/python/src/deltachat/iothreads.py b/python/src/deltachat/iothreads.py deleted file mode 100644 index 62792b133..000000000 --- a/python/src/deltachat/iothreads.py +++ /dev/null @@ -1,64 +0,0 @@ - -import threading -import time - -from contextlib import contextmanager - -from .capi import ffi, lib -import deltachat -from .eventlogger import FFIEvent - -class IOThreads: - def __init__(self, account): - self.account = account - self._dc_context = account._dc_context - self._thread_quitflag = False - self._name2thread = {} - - def is_started(self): - return len(self._name2thread) > 0 - - def start(self): - assert not self.is_started() - - self._start_one_thread("cb", self.cb_thread_run) - - def _start_one_thread(self, name, func): - self._name2thread[name] = t = threading.Thread(target=func, name=name) - t.setDaemon(1) - t.start() - - @contextmanager - def log_execution(self, message): - self.account.ac_log_line(message + " START") - yield - self.account.ac_log_line(message + " FINISHED") - - def stop(self, wait=False): - self._thread_quitflag = True - - if wait: - for name, thread in self._name2thread.items(): - if thread != threading.currentThread(): - thread.join() - - def cb_thread_run(self): - with self.log_execution("CALLBACK THREAD START"): - it = self.account.iter_events() - while not self._thread_quitflag: - try: - ev = next(it) - except StopIteration: - break - self.account.ac_log_line("calling hook name={} kwargs={}".format(ev.name, ev.kwargs)) - try: - ev.call_hook() - except Exception: - # don't bother logging this error - # because dc_close() was concurrently called - # and core API starts failing after that. - if not self._thread_quitflag: - raise - - - diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 109085739..6cc65dc14 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -1,5 +1,6 @@ from __future__ import print_function + def wait_configuration_progress(account, min_target, max_target=1001, check_error=True): min_target = min(min_target, max_target) while 1: diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 3387163bb..9855085d2 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -1686,4 +1686,3 @@ class TestOnlineConfigureFails: ev = ac1._evtracker.get_matching("DC_EVENT_ERROR_NETWORK") assert "could not connect" in ev.data2.lower() wait_configuration_progress(ac1, 0, 0) - diff --git a/python/tests/test_lowlevel.py b/python/tests/test_lowlevel.py index 9161f4dc1..247f265f9 100644 --- a/python/tests/test_lowlevel.py +++ b/python/tests/test_lowlevel.py @@ -1,7 +1,6 @@ from __future__ import print_function -import threading -import time +from queue import Queue from deltachat import capi, cutil, const from deltachat import register_global_plugin from deltachat.hookspec import global_hookimpl @@ -10,34 +9,6 @@ from deltachat.capi import lib # from deltachat.account import EventLogger -class EventThread(threading.Thread): - def __init__(self, dc_context): - self.dc_context = dc_context - super(EventThread, self).__init__() - self.setDaemon(1) - self._running = True - - def run(self): - lib.dc_context_run(self.dc_context) - while self._running: - if lib.dc_has_next_event(self.dc_context): - event = lib.dc_get_next_event(self.dc_context) - if event != ffi.NULL: - py_dc_callback( - self._dc_context, - lib.dc_event_get_id(event), - lib.dc_event_get_data1(event), - lib.dc_event_get_data2(event) - ) - lib.dc_event_unref(event) - else: - time.sleep(0.05) - - def stop(self): - lib.dc_context_shutdown(self.dc_context) - self._running = False - - def test_empty_context(): ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL) capi.lib.dc_close(ctx) @@ -47,33 +18,17 @@ def test_dc_close_events(tmpdir, acfactory): ac1 = acfactory.get_unconfigured_account() # register after_shutdown function - shutdowns = [] + shutdowns = Queue() class ShutdownPlugin: @global_hookimpl def dc_account_after_shutdown(self, account): assert account._dc_context is None - shutdowns.append(account) + shutdowns.put(account) register_global_plugin(ShutdownPlugin()) assert hasattr(ac1, "_dc_context") ac1.shutdown() - assert shutdowns == [ac1] - - def find(info_string): - evlog = ac1._evtracker - while 1: - ev = evlog.get_matching("DC_EVENT_INFO", check_error=False) - data2 = ev.data2 - if info_string in data2: - return - else: - print("skipping event", ev) - - find("disconnecting inbox-thread") - find("disconnecting sentbox-thread") - find("disconnecting mvbox-thread") - find("disconnecting SMTP") - find("Database closed") + shutdowns.get(timeout=2) def test_wrong_db(tmpdir):