refactor callback thread handling

This commit is contained in:
holger krekel
2020-05-20 16:32:12 +02:00
parent b91d7f314b
commit f67c86cb39
7 changed files with 123 additions and 221 deletions

View File

@@ -1,7 +1,7 @@
import sys
from . import capi, const, hookspec
from .capi import ffi
from . import capi, const, hookspec # noqa
from .capi import ffi # noqa
from .account import Account # noqa
from .message import Message # noqa
from .contact import Contact # noqa

View File

@@ -4,20 +4,19 @@ from __future__ import print_function
import atexit
from contextlib import contextmanager
from email.utils import parseaddr
import queue
from threading import Event
import os
from array import array
import deltachat
from . import const
from .capi import ffi, lib
from .cutil import as_dc_charpointer, from_dc_charpointer, iter_array, DCLot
from .chat import Chat
from .message import Message, map_system_message
from .message import Message
from .contact import Contact
from .tracker import ImexTracker, ConfigureTracker
from . import hookspec, iothreads
from .eventlogger import FFIEvent, FFIEventLogger
from . import hookspec
from .eventlogger import FFIEventLogger, CallbackThread
class MissingCredentials(ValueError):
""" Account is missing `addr` and `mail_pw` config values. """
@@ -52,8 +51,6 @@ class Account(object):
hook = hookspec.Global._get_plugin_manager().hook
self._threads = iothreads.IOThreads(self)
self._in_use_iter_events = False
self._shutdown_event = Event()
# open database
@@ -62,7 +59,7 @@ class Account(object):
db_path = db_path.encode("utf8")
if not lib.dc_open(self._dc_context, db_path, ffi.NULL):
raise ValueError("Could not dc_open: {}".format(db_path))
self._threads.start()
self._cb_thread = CallbackThread(self)
self._configkeys = self.get_config("sys.config_keys").split()
atexit.register(self.shutdown)
hook.dc_account_init(account=self)
@@ -465,8 +462,8 @@ class Account(object):
If sending out was unsuccessful, a RuntimeError is raised.
"""
self.check_is_configured()
if not self._threads.is_started() or not self.is_started():
raise RuntimeError("threads not running, can not send out")
if not self._cb_thread.is_alive() or not self.is_started():
raise RuntimeError("IO not running, can not send out")
res = lib.dc_initiate_key_transfer(self._dc_context)
if res == ffi.NULL:
raise RuntimeError("could not send out autocrypt setup message")
@@ -591,6 +588,7 @@ class Account(object):
def stop_scheduler(self):
""" stop core scheduler if it is running. """
self.ac_log_line("context_shutdown (stop core scheduler)")
self.stop_ongoing()
lib.dc_context_shutdown(self._dc_context)
def shutdown(self, wait=True):
@@ -600,90 +598,23 @@ class Account(object):
if dc_context is None:
return
self.stop_ongoing()
if self._threads.is_started():
if self._cb_thread.is_alive():
self.ac_log_line("stop threads")
self._threads.stop(wait=False)
self._cb_thread.stop(wait=False)
self.stop_scheduler()
self.ac_log_line("dc_close")
lib.dc_close(dc_context)
self.ac_log_line("wait threads for real")
self._threads.stop(wait=wait) # to wait for threads
if wait:
self._cb_thread.stop(wait=wait)
self._dc_context = None
atexit.unregister(self.shutdown)
self._shutdown_event.set()
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_after_shutdown(account=self, dc_context=dc_context)
def iter_events(self, timeout=None):
""" yield hook events until shutdown.
It is not allowed to call iter_events() from multiple threads.
"""
if self._in_use_iter_events:
raise RuntimeError("can only call iter_events() from one thread")
self._in_use_iter_events = True
while lib.dc_is_open(self._dc_context):
self.ac_log_line("waiting for event")
event = lib.dc_get_next_event(self._dc_context)
if event == ffi.NULL:
break
self.ac_log_line("got event {}".format(event))
evt = lib.dc_event_get_id(event)
data1 = lib.dc_event_get_data1(event)
data2 = lib.dc_event_get_data2(event)
# the following code relates to the deltachat/_build.py's helper
# function which provides us signature info of an event call
evt_name = deltachat.get_dc_event_name(evt)
event_sig_types = lib.dc_get_event_signature_types(evt)
if data1 and event_sig_types & 1:
data1 = ffi.string(ffi.gc(ffi.cast('char*', data1), lib.dc_str_unref)).decode("utf8")
if data2 and event_sig_types & 2:
data2 = ffi.string(ffi.gc(ffi.cast('char*', data2), lib.dc_str_unref)).decode("utf8")
try:
if isinstance(data2, bytes):
data2 = data2.decode("utf8")
except UnicodeDecodeError:
# XXX ignoring the decode error is not quite correct but for now
# i don't want to hunt down encoding problems in the c lib
pass
lib.dc_event_unref(event)
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
self._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
for name, kwargs in self._map_ffi_event(ffi_event):
yield HookEvent(self, name=name, kwargs=kwargs)
def _map_ffi_event(self, ffi_event):
name = ffi_event.name
if name == "DC_EVENT_CONFIGURE_PROGRESS":
data1 = ffi_event.data1
if data1 == 0 or data1 == 1000:
success = data1 == 1000
yield "ac_configure_completed", dict(success=success)
elif name == "DC_EVENT_INCOMING_MSG":
msg = self.get_message_by_id(ffi_event.data2)
yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg))
elif name == "DC_EVENT_MSGS_CHANGED":
if ffi_event.data2 != 0:
msg = self.get_message_by_id(ffi_event.data2)
if msg.is_outgoing():
res = map_system_message(msg)
if res and res[0].startswith("ac_member"):
yield res
yield "ac_outgoing_message", dict(message=msg)
elif msg.is_in_fresh():
yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg))
elif name == "DC_EVENT_MSG_DELIVERED":
msg = self.get_message_by_id(ffi_event.data2)
yield "ac_message_delivered", dict(message=msg)
elif name == "DC_EVENT_CHAT_MODIFIED":
chat = self.get_chat_by_id(ffi_event.data1)
yield "ac_chat_modified", dict(chat=chat)
def _destroy_dc_context(dc_context, dc_context_unref=lib.dc_context_unref):
# destructor for dc_context
@@ -703,17 +634,3 @@ class ScannedQRCode:
@property
def contact_id(self):
return self._dc_lot.id()
class HookEvent:
def __init__(self, account, name, kwargs):
assert hasattr(account._pm.hook, name), name
self.account = account
self.name = name
self.kwargs = kwargs
def call_hook(self):
hook = getattr(self.account._pm.hook, self.name, None)
if hook is None:
raise ValueError("event_name {} unknown".format(self.name))
return hook(**self.kwargs)

View File

@@ -1,18 +1,13 @@
import deltachat
import threading
import time
import re
from queue import Queue, Empty
from .hookspec import account_hookimpl, global_hookimpl
# @global_hookimpl
# def dc_account_init(account):
# account._threads.start()
# @global_hookimpl
# def dc_account_after_shutdown(dc_context):
import deltachat
from .hookspec import account_hookimpl
from contextlib import contextmanager
from .capi import ffi, lib
from .message import map_system_message
class FFIEvent:
@@ -127,3 +122,102 @@ class FFIEventTracker:
ev = self.get_matching("DC_EVENT_MSGS_CHANGED")
if ev.data2 > 0:
return self.account.get_message_by_id(ev.data2)
class CallbackThread(threading.Thread):
""" Callback Thread for an account.
With each Account init this callback thread is started.
"""
def __init__(self, account):
self.account = account
self._dc_context = account._dc_context
self._thread_quitflag = False
super(CallbackThread, self).__init__(name="callback")
self.start()
@contextmanager
def log_execution(self, message):
self.account.ac_log_line(message + " START")
yield
self.account.ac_log_line(message + " FINISHED")
def stop(self, wait=False):
self._thread_quitflag = True
if wait:
self.join()
def run(self):
""" get and run events until shutdown. """
with self.log_execution("CALLBACK THREAD START"):
self._inner_run()
def _inner_run(self):
while lib.dc_is_open(self._dc_context) and not self._thread_quitflag:
self.account.ac_log_line("waiting for event")
event = lib.dc_get_next_event(self._dc_context)
if event == ffi.NULL:
break
self.account.ac_log_line("got event {}".format(event))
evt = lib.dc_event_get_id(event)
data1 = lib.dc_event_get_data1(event)
data2 = lib.dc_event_get_data2(event)
# the following code relates to the deltachat/_build.py's helper
# function which provides us signature info of an event call
evt_name = deltachat.get_dc_event_name(evt)
event_sig_types = lib.dc_get_event_signature_types(evt)
if data1 and event_sig_types & 1:
data1 = ffi.string(ffi.gc(ffi.cast('char*', data1), lib.dc_str_unref)).decode("utf8")
if data2 and event_sig_types & 2:
data2 = ffi.string(ffi.gc(ffi.cast('char*', data2), lib.dc_str_unref)).decode("utf8")
try:
if isinstance(data2, bytes):
data2 = data2.decode("utf8")
except UnicodeDecodeError:
# XXX ignoring the decode error is not quite correct but for now
# i don't want to hunt down encoding problems in the c lib
pass
lib.dc_event_unref(event)
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
for name, kwargs in self._map_ffi_event(ffi_event):
self.account.ac_log_line("calling hook name={} kwargs={}".format(name, kwargs))
hook = getattr(self.account._pm.hook, name)
try:
hook(**kwargs)
except Exception:
# don't bother logging this error
# if dc_close() was concurrently called
# (note: core API starts failing after that)
if not self._thread_quitflag:
raise
def _map_ffi_event(self, ffi_event):
name = ffi_event.name
if name == "DC_EVENT_CONFIGURE_PROGRESS":
data1 = ffi_event.data1
if data1 == 0 or data1 == 1000:
success = data1 == 1000
yield "ac_configure_completed", dict(success=success)
elif name == "DC_EVENT_INCOMING_MSG":
msg = self.get_message_by_id(ffi_event.data2)
yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg))
elif name == "DC_EVENT_MSGS_CHANGED":
if ffi_event.data2 != 0:
msg = self.account.get_message_by_id(ffi_event.data2)
if msg.is_outgoing():
res = map_system_message(msg)
if res and res[0].startswith("ac_member"):
yield res
yield "ac_outgoing_message", dict(message=msg)
elif msg.is_in_fresh():
yield map_system_message(msg) or ("ac_incoming_message", dict(message=msg))
elif name == "DC_EVENT_MSG_DELIVERED":
msg = self.account.get_message_by_id(ffi_event.data2)
yield "ac_message_delivered", dict(message=msg)
elif name == "DC_EVENT_CHAT_MODIFIED":
chat = self.account.get_chat_by_id(ffi_event.data1)
yield "ac_chat_modified", dict(chat=chat)

View File

@@ -1,64 +0,0 @@
import threading
import time
from contextlib import contextmanager
from .capi import ffi, lib
import deltachat
from .eventlogger import FFIEvent
class IOThreads:
def __init__(self, account):
self.account = account
self._dc_context = account._dc_context
self._thread_quitflag = False
self._name2thread = {}
def is_started(self):
return len(self._name2thread) > 0
def start(self):
assert not self.is_started()
self._start_one_thread("cb", self.cb_thread_run)
def _start_one_thread(self, name, func):
self._name2thread[name] = t = threading.Thread(target=func, name=name)
t.setDaemon(1)
t.start()
@contextmanager
def log_execution(self, message):
self.account.ac_log_line(message + " START")
yield
self.account.ac_log_line(message + " FINISHED")
def stop(self, wait=False):
self._thread_quitflag = True
if wait:
for name, thread in self._name2thread.items():
if thread != threading.currentThread():
thread.join()
def cb_thread_run(self):
with self.log_execution("CALLBACK THREAD START"):
it = self.account.iter_events()
while not self._thread_quitflag:
try:
ev = next(it)
except StopIteration:
break
self.account.ac_log_line("calling hook name={} kwargs={}".format(ev.name, ev.kwargs))
try:
ev.call_hook()
except Exception:
# don't bother logging this error
# because dc_close() was concurrently called
# and core API starts failing after that.
if not self._thread_quitflag:
raise

View File

@@ -1,5 +1,6 @@
from __future__ import print_function
def wait_configuration_progress(account, min_target, max_target=1001, check_error=True):
min_target = min(min_target, max_target)
while 1:

View File

@@ -1686,4 +1686,3 @@ class TestOnlineConfigureFails:
ev = ac1._evtracker.get_matching("DC_EVENT_ERROR_NETWORK")
assert "could not connect" in ev.data2.lower()
wait_configuration_progress(ac1, 0, 0)

View File

@@ -1,7 +1,6 @@
from __future__ import print_function
import threading
import time
from queue import Queue
from deltachat import capi, cutil, const
from deltachat import register_global_plugin
from deltachat.hookspec import global_hookimpl
@@ -10,34 +9,6 @@ from deltachat.capi import lib
# from deltachat.account import EventLogger
class EventThread(threading.Thread):
def __init__(self, dc_context):
self.dc_context = dc_context
super(EventThread, self).__init__()
self.setDaemon(1)
self._running = True
def run(self):
lib.dc_context_run(self.dc_context)
while self._running:
if lib.dc_has_next_event(self.dc_context):
event = lib.dc_get_next_event(self.dc_context)
if event != ffi.NULL:
py_dc_callback(
self._dc_context,
lib.dc_event_get_id(event),
lib.dc_event_get_data1(event),
lib.dc_event_get_data2(event)
)
lib.dc_event_unref(event)
else:
time.sleep(0.05)
def stop(self):
lib.dc_context_shutdown(self.dc_context)
self._running = False
def test_empty_context():
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL)
capi.lib.dc_close(ctx)
@@ -47,33 +18,17 @@ def test_dc_close_events(tmpdir, acfactory):
ac1 = acfactory.get_unconfigured_account()
# register after_shutdown function
shutdowns = []
shutdowns = Queue()
class ShutdownPlugin:
@global_hookimpl
def dc_account_after_shutdown(self, account):
assert account._dc_context is None
shutdowns.append(account)
shutdowns.put(account)
register_global_plugin(ShutdownPlugin())
assert hasattr(ac1, "_dc_context")
ac1.shutdown()
assert shutdowns == [ac1]
def find(info_string):
evlog = ac1._evtracker
while 1:
ev = evlog.get_matching("DC_EVENT_INFO", check_error=False)
data2 = ev.data2
if info_string in data2:
return
else:
print("skipping event", ev)
find("disconnecting inbox-thread")
find("disconnecting sentbox-thread")
find("disconnecting mvbox-thread")
find("disconnecting SMTP")
find("Database closed")
shutdowns.get(timeout=2)
def test_wrong_db(tmpdir):