mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 15:02:11 +03:00
Also cleaned up test_connectivity()
which tested that state does not flicker to WORKING
when there are no messages to be fetched.
The state is expected to flicker to WORKING
when checking for new messages,
so the tests were outdated since
change 3b0b2379b8
334 lines
12 KiB
Python
334 lines
12 KiB
Python
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from contextlib import contextmanager
|
|
from queue import Empty, Queue
|
|
|
|
from . import const
|
|
from .account import Account
|
|
from .capi import ffi, lib
|
|
from .cutil import from_optional_dc_charpointer
|
|
from .hookspec import account_hookimpl
|
|
from .message import map_system_message
|
|
|
|
|
|
def get_dc_event_name(integer, _DC_EVENTNAME_MAP={}):
|
|
if not _DC_EVENTNAME_MAP:
|
|
for name in dir(const):
|
|
if name.startswith("DC_EVENT_"):
|
|
_DC_EVENTNAME_MAP[getattr(const, name)] = name
|
|
return _DC_EVENTNAME_MAP[integer]
|
|
|
|
|
|
class FFIEvent:
|
|
def __init__(self, name: str, data1, data2) -> None:
|
|
self.name = name
|
|
self.data1 = data1
|
|
self.data2 = data2
|
|
|
|
def __str__(self) -> str:
|
|
if self.name == "DC_EVENT_INFO":
|
|
return f"INFO {self.data2}"
|
|
if self.name == "DC_EVENT_WARNING":
|
|
return f"WARNING {self.data2}"
|
|
if self.name == "DC_EVENT_ERROR":
|
|
return f"ERROR {self.data2}"
|
|
return "{name} data1={data1} data2={data2}".format(**self.__dict__)
|
|
|
|
|
|
class FFIEventLogger:
|
|
"""If you register an instance of this logger with an Account
|
|
you'll get all ffi-events printed.
|
|
"""
|
|
|
|
# to prevent garbled logging
|
|
_loglock = threading.RLock()
|
|
|
|
def __init__(self, account, logid=None, init_time=None) -> None:
|
|
self.account = account
|
|
self.logid = logid or self.account.get_config("displayname")
|
|
if init_time is None:
|
|
init_time = time.time()
|
|
self.init_time = init_time
|
|
|
|
@account_hookimpl
|
|
def ac_process_ffi_event(self, ffi_event: FFIEvent) -> None:
|
|
self.account.log(str(ffi_event))
|
|
|
|
@account_hookimpl
|
|
def ac_log_line(self, message):
|
|
t = threading.current_thread()
|
|
tname = getattr(t, "name", t)
|
|
if tname == "MainThread":
|
|
tname = "MAIN"
|
|
elapsed = time.time() - self.init_time
|
|
locname = tname
|
|
if self.logid:
|
|
locname += "-" + self.logid
|
|
s = f"{elapsed:2.2f} [{locname}] {message}"
|
|
|
|
if os.name == "posix":
|
|
WARN = "\033[93m"
|
|
ERROR = "\033[91m"
|
|
ENDC = "\033[0m"
|
|
if message.startswith("DC_EVENT_WARNING"):
|
|
s = WARN + s + ENDC
|
|
if message.startswith("DC_EVENT_ERROR"):
|
|
s = ERROR + s + ENDC
|
|
with self._loglock:
|
|
print(s, flush=True)
|
|
|
|
|
|
class FFIEventTracker:
|
|
account: Account
|
|
_event_queue: Queue
|
|
|
|
def __init__(self, account: Account, timeout=None) -> None:
|
|
self.account = account
|
|
self._timeout = timeout
|
|
self._event_queue = Queue()
|
|
|
|
@account_hookimpl
|
|
def ac_process_ffi_event(self, ffi_event: FFIEvent):
|
|
self._event_queue.put(ffi_event)
|
|
|
|
def set_timeout(self, timeout):
|
|
self._timeout = timeout
|
|
|
|
def consume_events(self, check_error=True):
|
|
while not self._event_queue.empty():
|
|
self.get(check_error=check_error)
|
|
|
|
def get(self, timeout=None, check_error=True):
|
|
timeout = timeout if timeout is not None else self._timeout
|
|
ev = self._event_queue.get(timeout=timeout)
|
|
if check_error and ev.name == "DC_EVENT_ERROR":
|
|
raise ValueError(f"unexpected event: {ev}")
|
|
return ev
|
|
|
|
def iter_events(self, timeout=None, check_error=True):
|
|
while True:
|
|
yield self.get(timeout=timeout, check_error=check_error)
|
|
|
|
def get_matching(self, event_name_regex, check_error=True, timeout=None):
|
|
rex = re.compile(f"^(?:{event_name_regex})$")
|
|
for ev in self.iter_events(timeout=timeout, check_error=check_error):
|
|
if rex.match(ev.name):
|
|
return ev
|
|
|
|
def get_info_contains(self, regex: str) -> FFIEvent:
|
|
rex = re.compile(regex)
|
|
while True:
|
|
ev = self.get_matching("DC_EVENT_INFO")
|
|
if rex.search(ev.data2):
|
|
return ev
|
|
|
|
def get_info_regex_groups(self, regex, check_error=True):
|
|
rex = re.compile(regex)
|
|
while True:
|
|
ev = self.get_matching("DC_EVENT_INFO", check_error=check_error)
|
|
m = rex.match(ev.data2)
|
|
if m is not None:
|
|
return m.groups()
|
|
|
|
def wait_for_connectivity(self, connectivity):
|
|
"""Wait for the specified connectivity.
|
|
This only works reliably if the connectivity doesn't change
|
|
again too quickly, otherwise we might miss it.
|
|
"""
|
|
while True:
|
|
if self.account.get_connectivity() == connectivity:
|
|
return
|
|
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
|
|
|
|
def wait_for_connectivity_change(self, previous, expected_next):
|
|
"""Wait until the connectivity changes to `expected_next`.
|
|
Fails the test if it changes to something else.
|
|
"""
|
|
while True:
|
|
current = self.account.get_connectivity()
|
|
if current == expected_next:
|
|
return
|
|
if current != previous:
|
|
raise Exception("Expected connectivity " + str(expected_next) + " but got " + str(current))
|
|
|
|
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
|
|
|
|
def ensure_event_not_queued(self, event_name_regex):
|
|
__tracebackhide__ = True
|
|
rex = re.compile(f"(?:{event_name_regex}).*")
|
|
while True:
|
|
try:
|
|
ev = self._event_queue.get(False)
|
|
except Empty:
|
|
break
|
|
else:
|
|
assert not rex.match(ev.name), f"event found {ev}"
|
|
|
|
def wait_securejoin_inviter_progress(self, target):
|
|
while True:
|
|
event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS")
|
|
if event.data2 >= target:
|
|
print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account)
|
|
break
|
|
|
|
def wait_securejoin_joiner_progress(self, target):
|
|
while True:
|
|
event = self.get_matching("DC_EVENT_SECUREJOIN_JOINER_PROGRESS")
|
|
if event.data2 >= target:
|
|
break
|
|
|
|
def wait_idle_inbox_ready(self):
|
|
"""Has to be called after start_io() to wait for fetch_existing_msgs to run
|
|
so that new messages are not mistaken for old ones:
|
|
- ac1 and ac2 are created
|
|
- ac1 sends a message to ac2
|
|
- ac2 is still running FetchExsistingMsgs job and thinks it's an existing, old message
|
|
- therefore no DC_EVENT_INCOMING_MSG is sent
|
|
"""
|
|
self.get_matching("DC_EVENT_IMAP_INBOX_IDLE")
|
|
|
|
def wait_next_incoming_message(self):
|
|
"""wait for and return next incoming message."""
|
|
ev = self.get_matching("DC_EVENT_INCOMING_MSG")
|
|
return self.account.get_message_by_id(ev.data2)
|
|
|
|
def wait_next_messages_changed(self):
|
|
"""wait for and return next message-changed message or None
|
|
if the event contains no msgid
|
|
"""
|
|
ev = self.get_matching("DC_EVENT_MSGS_CHANGED")
|
|
if ev.data2 > 0:
|
|
return self.account.get_message_by_id(ev.data2)
|
|
return None
|
|
|
|
def wait_next_reactions_changed(self):
|
|
"""wait for and return next reactions-changed message."""
|
|
ev = self.get_matching("DC_EVENT_REACTIONS_CHANGED")
|
|
assert ev.data1 > 0
|
|
return self.account.get_message_by_id(ev.data2)
|
|
|
|
def wait_msg_delivered(self, msg):
|
|
ev = self.get_matching("DC_EVENT_MSG_DELIVERED")
|
|
assert ev.data1 == msg.chat.id
|
|
assert ev.data2 == msg.id
|
|
assert msg.is_out_delivered()
|
|
|
|
|
|
class EventThread(threading.Thread):
|
|
"""Event Thread for an account.
|
|
|
|
With each Account init this callback thread is started.
|
|
"""
|
|
|
|
def __init__(self, account: Account) -> None:
|
|
self.account = account
|
|
super(EventThread, self).__init__(name="events")
|
|
self.daemon = True
|
|
self._marked_for_shutdown = False
|
|
self.start()
|
|
|
|
@contextmanager
|
|
def log_execution(self, message):
|
|
self.account.log(message + " START")
|
|
yield
|
|
self.account.log(message + " FINISHED")
|
|
|
|
def mark_shutdown(self) -> None:
|
|
self._marked_for_shutdown = True
|
|
|
|
def wait(self, timeout=None) -> None:
|
|
if self == threading.current_thread():
|
|
# we are in the callback thread and thus cannot
|
|
# wait for the thread-loop to finish.
|
|
return
|
|
self.join(timeout=timeout)
|
|
|
|
def run(self) -> None:
|
|
"""get and run events until shutdown."""
|
|
with self.log_execution("EVENT THREAD"):
|
|
event_emitter = ffi.gc(
|
|
lib.dc_get_event_emitter(self.account._dc_context),
|
|
lib.dc_event_emitter_unref,
|
|
)
|
|
while not self._marked_for_shutdown:
|
|
with self.swallow_and_log_exception("Unexpected error in event thread"):
|
|
event = lib.dc_get_next_event(event_emitter)
|
|
if event == ffi.NULL or self._marked_for_shutdown:
|
|
break
|
|
self._process_event(event)
|
|
|
|
def _process_event(self, event) -> None:
|
|
evt = lib.dc_event_get_id(event)
|
|
data1 = lib.dc_event_get_data1_int(event)
|
|
# the following code relates to the deltachat/_build.py's helper
|
|
# function which provides us signature info of an event call
|
|
evt_name = get_dc_event_name(evt)
|
|
if lib.dc_event_has_string_data(evt):
|
|
data2 = from_optional_dc_charpointer(lib.dc_event_get_data2_str(event))
|
|
else:
|
|
data2 = lib.dc_event_get_data2_int(event)
|
|
|
|
lib.dc_event_unref(event)
|
|
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
|
|
with self.swallow_and_log_exception(f"ac_process_ffi_event {ffi_event}"):
|
|
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
|
|
for name, kwargs in self._map_ffi_event(ffi_event):
|
|
hook = getattr(self.account._pm.hook, name)
|
|
info = f"call {name} kwargs={kwargs} failed"
|
|
with self.swallow_and_log_exception(info):
|
|
hook(**kwargs)
|
|
|
|
@contextmanager
|
|
def swallow_and_log_exception(self, info):
|
|
try:
|
|
yield
|
|
except Exception as ex:
|
|
logfile = io.StringIO()
|
|
traceback.print_exception(*sys.exc_info(), file=logfile)
|
|
self.account.log(f"{info}\nException {ex}\nTraceback:\n{logfile.getvalue()}")
|
|
|
|
def _map_ffi_event(self, ffi_event: FFIEvent):
|
|
name = ffi_event.name
|
|
account = self.account
|
|
if name == "DC_EVENT_CONFIGURE_PROGRESS":
|
|
data1 = ffi_event.data1
|
|
if data1 == 0 or data1 == 1000:
|
|
success = data1 == 1000
|
|
comment = ffi_event.data2
|
|
yield "ac_configure_completed", {"success": success, "comment": comment}
|
|
elif name == "DC_EVENT_INCOMING_MSG":
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
if msg is not None:
|
|
yield map_system_message(msg) or ("ac_incoming_message", {"message": msg})
|
|
elif name == "DC_EVENT_MSGS_CHANGED":
|
|
if ffi_event.data2 != 0:
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
if msg is not None:
|
|
if msg.is_outgoing():
|
|
res = map_system_message(msg)
|
|
if res and res[0].startswith("ac_member"):
|
|
yield res
|
|
yield "ac_outgoing_message", {"message": msg}
|
|
elif msg.is_in_fresh():
|
|
yield map_system_message(msg) or (
|
|
"ac_incoming_message",
|
|
{"message": msg},
|
|
)
|
|
elif name == "DC_EVENT_REACTIONS_CHANGED":
|
|
assert ffi_event.data1 > 0
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
if msg is not None:
|
|
yield "ac_reactions_changed", {"message": msg}
|
|
elif name == "DC_EVENT_MSG_DELIVERED":
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
if msg is not None:
|
|
yield "ac_message_delivered", {"message": msg}
|
|
elif name == "DC_EVENT_CHAT_MODIFIED":
|
|
chat = account.get_chat_by_id(ffi_event.data1)
|
|
yield "ac_chat_modified", {"chat": chat}
|