refactor docs and ffi/high level event handling to pass all tests again

This commit is contained in:
holger krekel
2020-03-03 13:11:25 +01:00
parent a1379f61da
commit 91cdc76414
9 changed files with 161 additions and 72 deletions

View File

@@ -15,6 +15,7 @@ from .message import Message
from .contact import Contact
from .tracker import ImexTracker
from . import hookspec, iothreads
from queue import Queue
class MissingCredentials(ValueError):
@@ -48,6 +49,9 @@ class Account(object):
hook.account_init(account=self, db_path=db_path)
self._threads = iothreads.IOThreads(self)
self._hook_event_queue = Queue()
self._in_use_iter_events = False
self._shutdown_event = Event()
# open database
if hasattr(db_path, "encode"):
@@ -56,30 +60,13 @@ class Account(object):
raise ValueError("Could not dc_open: {}".format(db_path))
self._configkeys = self.get_config("sys.config_keys").split()
atexit.register(self.shutdown)
self._shutdown_event = Event()
@hookspec.account_hookimpl
def process_ffi_event(self, ffi_event):
name = ffi_event.name
if name == "DC_EVENT_CONFIGURE_PROGRESS":
data1 = ffi_event.data1
if data1 == 0 or data1 == 1000:
success = data1 == 1000
self._pm.hook.configure_completed(success=success)
elif name == "DC_EVENT_INCOMING_MSG":
msg = self.get_message_by_id(ffi_event.data2)
self._pm.hook.process_incoming_message(message=msg)
elif name == "DC_EVENT_MSGS_CHANGED":
if ffi_event.data2 != 0:
msg = self.get_message_by_id(ffi_event.data2)
self._pm.hook.process_incoming_message(message=msg)
elif name == "DC_EVENT_MSG_DELIVERED":
msg = self.get_message_by_id(ffi_event.data2)
self._pm.hook.process_message_delivered(message=msg)
elif name == "DC_EVENT_MEMBER_ADDED":
chat = self.get_chat_by_id(ffi_event.data1)
contact = self.get_contact_by_id(ffi_event.data2)
self._pm.hook.member_added(chat=chat, contact=contact)
name, kwargs = self._map_ffi_event(ffi_event)
if name is not None:
ev = HookEvent(self, name=name, kwargs=kwargs)
self._hook_event_queue.put(ev)
# def __del__(self):
# self.shutdown()
@@ -547,7 +534,7 @@ class Account(object):
""" Stop ongoing securejoin, configuration or other core jobs. """
lib.dc_stop_ongoing_process(self._dc_context)
def start(self):
def start(self, callback_thread=True):
""" start this account (activate imap/smtp threads etc.)
and return immediately.
@@ -565,7 +552,7 @@ class Account(object):
if not self.get_config("addr") or not self.get_config("mail_pw"):
raise MissingCredentials("addr or mail_pwd not set in config")
lib.dc_configure(self._dc_context)
self._threads.start()
self._threads.start(callback_thread=callback_thread)
def wait_shutdown(self):
""" wait until shutdown of this account has completed. """
@@ -582,12 +569,51 @@ class Account(object):
self.stop_ongoing()
self._threads.stop(wait=False)
lib.dc_close(dc_context)
self._hook_event_queue.put(None)
self._threads.stop(wait=wait) # to wait for threads
self._dc_context = None
atexit.unregister(self.shutdown)
self._shutdown_event.set()
hook = hookspec.Global._get_plugin_manager().hook
hook.account_after_shutdown(account=self, dc_context=dc_context)
self._shutdown_event.set()
def iter_events(self, timeout=None):
""" yield hook events until shutdown.
It is not allowed to call iter_events() from multiple threads.
"""
if self._in_use_iter_events:
raise RuntimeError("can only call iter_events() from one thread")
self._in_use_iter_events = True
while 1:
event = self._hook_event_queue.get(timeout=timeout)
if event is None:
break
yield event
def _map_ffi_event(self, ffi_event):
name = ffi_event.name
if name == "DC_EVENT_CONFIGURE_PROGRESS":
data1 = ffi_event.data1
if data1 == 0 or data1 == 1000:
success = data1 == 1000
return "configure_completed", dict(success=success)
elif name == "DC_EVENT_INCOMING_MSG":
msg = self.get_message_by_id(ffi_event.data2)
return "process_incoming_message", dict(message=msg)
elif name == "DC_EVENT_MSGS_CHANGED":
if ffi_event.data2 != 0:
msg = self.get_message_by_id(ffi_event.data2)
if msg.is_in_fresh():
return "process_incoming_message", dict(message=msg)
elif name == "DC_EVENT_MSG_DELIVERED":
msg = self.get_message_by_id(ffi_event.data2)
return "process_message_delivered", dict(message=msg)
elif name == "DC_EVENT_MEMBER_ADDED":
chat = self.get_chat_by_id(ffi_event.data1)
contact = self.get_contact_by_id(ffi_event.data2)
return "member_added", dict(chat=chat, contact=contact)
return None, {}
def _destroy_dc_context(dc_context, dc_context_unref=lib.dc_context_unref):
@@ -614,3 +640,17 @@ class ScannedQRCode:
@property
def contact_id(self):
return self._dc_lot.id()
class HookEvent:
def __init__(self, account, name, kwargs):
assert hasattr(account._pm.hook, name), name
self.account = account
self.name = name
self.kwargs = kwargs
def call_hook(self):
hook = getattr(self.account._pm.hook, self.name, None)
if hook is None:
raise ValueError("event_name {} unknown".format(self.name))
return hook(**self.kwargs)

View File

@@ -15,7 +15,8 @@ global_hookimpl = pluggy.HookimplMarker(_global_name)
class PerAccount:
""" per-Account-instance hook specifications.
If you write a plugin you need to implement one of the following hooks.
Except for process_ffi_event all hooks are executed
in the thread which calls Account.wait_shutdown().
"""
@classmethod
def _make_plugin_manager(cls):
@@ -29,6 +30,10 @@ class PerAccount:
ffi_event has "name", "data1", "data2" values as specified
with `DC_EVENT_* <https://c.delta.chat/group__DC__EVENT.html>`_.
DANGER: this hook is executed from the callback invoked by core.
Hook implementations need to be short running and can typically
not call back into core because this would easily cause recursion issues.
"""
@account_hookspec

View File

@@ -17,13 +17,17 @@ class IOThreads:
def is_started(self):
return len(self._name2thread) > 0
def start(self):
def start(self, callback_thread):
assert not self.is_started()
self._start_one_thread("inbox", self.imap_thread_run)
self._start_one_thread("smtp", self.smtp_thread_run)
if callback_thread:
self._start_one_thread("cb", self.cb_thread_run)
if int(self.account.get_config("mvbox_watch")):
self._start_one_thread("mvbox", self.mvbox_thread_run)
if int(self.account.get_config("sentbox_watch")):
self._start_one_thread("sentbox", self.sentbox_thread_run)
@@ -53,7 +57,18 @@ class IOThreads:
lib.dc_interrupt_sentbox_idle(self._dc_context)
if wait:
for name, thread in self._name2thread.items():
thread.join()
if thread != threading.currentThread():
thread.join()
def cb_thread_run(self):
with self.log_execution("CALLBACK THREAD START"):
it = self.account.iter_events()
while not self._thread_quitflag:
try:
ev = next(it)
except StopIteration:
break
ev.call_hook()
def imap_thread_run(self):
with self.log_execution("INBOX THREAD START"):

View File

@@ -3,12 +3,10 @@ import os
import sys
import pytest
import requests
from contextlib import contextmanager
import time
from . import Account, const
from .tracker import ConfigureTracker
from .capi import lib
from .hookspec import account_hookimpl
from .eventlogger import FFIEventLogger, FFIEventTracker
from _pytest.monkeypatch import MonkeyPatch
import tempfile
@@ -63,9 +61,9 @@ def pytest_report_header(config, startdir):
cfg = config.option.liveconfig
if cfg:
if "#" in cfg:
url, token = cfg.split("#", 1)
summary.append('Liveconfig provider: {}#<token ommitted>'.format(url))
if "?" in cfg:
url, token = cfg.split("?", 1)
summary.append('Liveconfig provider: {}?<token ommitted>'.format(url))
else:
summary.append('Liveconfig file: {}'.format(cfg))
return summary
@@ -263,26 +261,3 @@ def lp():
def step(self, msg):
print("-" * 5, "step " + msg, "-" * 5)
return Printer()
@pytest.fixture
def make_plugin_recorder():
@contextmanager
def make_plugin_recorder(account):
class HookImpl:
def __init__(self):
self.calls_member_added = []
@account_hookimpl
def member_added(self, chat, contact):
self.calls_member_added.append(dict(chat=chat, contact=contact))
def get_first(self, name):
val = getattr(self, "calls_" + name, None)
if val is not None:
return val.pop(0)
with account.temp_plugin(HookImpl()) as plugin:
yield plugin
return make_plugin_recorder