mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 15:02:11 +03:00
332 lines
12 KiB
Python
332 lines
12 KiB
Python
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from contextlib import contextmanager
|
|
from queue import Empty, Queue
|
|
|
|
from . import const
|
|
from .capi import ffi, lib
|
|
from .cutil import from_optional_dc_charpointer
|
|
from .hookspec import account_hookimpl
|
|
from .message import map_system_message
|
|
from .account import Account
|
|
|
|
|
|
def get_dc_event_name(integer, _DC_EVENTNAME_MAP={}):
|
|
if not _DC_EVENTNAME_MAP:
|
|
for name in dir(const):
|
|
if name.startswith("DC_EVENT_"):
|
|
_DC_EVENTNAME_MAP[getattr(const, name)] = name
|
|
return _DC_EVENTNAME_MAP[integer]
|
|
|
|
|
|
class FFIEvent:
|
|
def __init__(self, name: str, data1, data2) -> None:
|
|
self.name = name
|
|
self.data1 = data1
|
|
self.data2 = data2
|
|
|
|
def __str__(self) -> str:
|
|
if self.name == "DC_EVENT_INFO":
|
|
return f"INFO {self.data2}"
|
|
if self.name == "DC_EVENT_WARNING":
|
|
return f"WARNING {self.data2}"
|
|
if self.name == "DC_EVENT_ERROR":
|
|
return f"ERROR {self.data2}"
|
|
return "{name} data1={data1} data2={data2}".format(**self.__dict__)
|
|
|
|
|
|
class FFIEventLogger:
|
|
"""If you register an instance of this logger with an Account
|
|
you'll get all ffi-events printed.
|
|
"""
|
|
|
|
# to prevent garbled logging
|
|
_loglock = threading.RLock()
|
|
|
|
def __init__(self, account, logid=None, init_time=None) -> None:
|
|
self.account = account
|
|
self.logid = logid or self.account.get_config("displayname")
|
|
if init_time is None:
|
|
init_time = time.time()
|
|
self.init_time = init_time
|
|
|
|
@account_hookimpl
|
|
def ac_process_ffi_event(self, ffi_event: FFIEvent) -> None:
|
|
self.account.log(str(ffi_event))
|
|
|
|
@account_hookimpl
|
|
def ac_log_line(self, message):
|
|
t = threading.current_thread()
|
|
tname = getattr(t, "name", t)
|
|
if tname == "MainThread":
|
|
tname = "MAIN"
|
|
elapsed = time.time() - self.init_time
|
|
locname = tname
|
|
if self.logid:
|
|
locname += "-" + self.logid
|
|
s = f"{elapsed:2.2f} [{locname}] {message}"
|
|
|
|
if os.name == "posix":
|
|
WARN = "\033[93m"
|
|
ERROR = "\033[91m"
|
|
ENDC = "\033[0m"
|
|
if message.startswith("DC_EVENT_WARNING"):
|
|
s = WARN + s + ENDC
|
|
if message.startswith("DC_EVENT_ERROR"):
|
|
s = ERROR + s + ENDC
|
|
with self._loglock:
|
|
print(s, flush=True)
|
|
|
|
|
|
class FFIEventTracker:
|
|
account: Account
|
|
_event_queue: Queue
|
|
|
|
def __init__(self, account: Account, timeout=None) -> None:
|
|
self.account = account
|
|
self._timeout = timeout
|
|
self._event_queue = Queue()
|
|
|
|
@account_hookimpl
|
|
def ac_process_ffi_event(self, ffi_event: FFIEvent):
|
|
self._event_queue.put(ffi_event)
|
|
|
|
def set_timeout(self, timeout):
|
|
self._timeout = timeout
|
|
|
|
def consume_events(self, check_error=True):
|
|
while not self._event_queue.empty():
|
|
self.get(check_error=check_error)
|
|
|
|
def get(self, timeout=None, check_error=True):
|
|
timeout = timeout if timeout is not None else self._timeout
|
|
ev = self._event_queue.get(timeout=timeout)
|
|
if check_error and ev.name == "DC_EVENT_ERROR":
|
|
raise ValueError(f"unexpected event: {ev}")
|
|
return ev
|
|
|
|
def iter_events(self, timeout=None, check_error=True):
|
|
while True:
|
|
yield self.get(timeout=timeout, check_error=check_error)
|
|
|
|
def get_matching(self, event_name_regex, check_error=True, timeout=None):
|
|
rex = re.compile(f"^(?:{event_name_regex})$")
|
|
for ev in self.iter_events(timeout=timeout, check_error=check_error):
|
|
if rex.match(ev.name):
|
|
return ev
|
|
|
|
def get_info_contains(self, regex: str) -> FFIEvent:
|
|
rex = re.compile(regex)
|
|
while True:
|
|
ev = self.get_matching("DC_EVENT_INFO")
|
|
if rex.search(ev.data2):
|
|
return ev
|
|
|
|
def get_info_regex_groups(self, regex, check_error=True):
|
|
rex = re.compile(regex)
|
|
while True:
|
|
ev = self.get_matching("DC_EVENT_INFO", check_error=check_error)
|
|
m = rex.match(ev.data2)
|
|
if m is not None:
|
|
return m.groups()
|
|
|
|
def wait_for_connectivity(self, connectivity):
|
|
"""Wait for the specified connectivity.
|
|
This only works reliably if the connectivity doesn't change
|
|
again too quickly, otherwise we might miss it.
|
|
"""
|
|
while True:
|
|
if self.account.get_connectivity() == connectivity:
|
|
return
|
|
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
|
|
|
|
def wait_for_connectivity_change(self, previous, expected_next):
|
|
"""Wait until the connectivity changes to `expected_next`.
|
|
Fails the test if it changes to something else.
|
|
"""
|
|
while True:
|
|
current = self.account.get_connectivity()
|
|
if current == expected_next:
|
|
return
|
|
if current != previous:
|
|
raise Exception("Expected connectivity " + str(expected_next) + " but got " + str(current))
|
|
|
|
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
|
|
|
|
def wait_for_all_work_done(self):
|
|
while True:
|
|
if self.account.all_work_done():
|
|
return
|
|
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
|
|
|
|
def ensure_event_not_queued(self, event_name_regex):
|
|
__tracebackhide__ = True
|
|
rex = re.compile(f"(?:{event_name_regex}).*")
|
|
while True:
|
|
try:
|
|
ev = self._event_queue.get(False)
|
|
except Empty:
|
|
break
|
|
else:
|
|
assert not rex.match(ev.name), f"event found {ev}"
|
|
|
|
def wait_securejoin_inviter_progress(self, target):
|
|
while True:
|
|
event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS")
|
|
if event.data2 >= target:
|
|
print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account)
|
|
break
|
|
|
|
def wait_idle_inbox_ready(self):
|
|
"""Has to be called after start_io() to wait for fetch_existing_msgs to run
|
|
so that new messages are not mistaken for old ones:
|
|
- ac1 and ac2 are created
|
|
- ac1 sends a message to ac2
|
|
- ac2 is still running FetchExsistingMsgs job and thinks it's an existing, old message
|
|
- therefore no DC_EVENT_INCOMING_MSG is sent
|
|
"""
|
|
self.get_matching("DC_EVENT_IMAP_INBOX_IDLE")
|
|
|
|
def wait_next_incoming_message(self):
|
|
"""wait for and return next incoming message."""
|
|
ev = self.get_matching("DC_EVENT_INCOMING_MSG")
|
|
return self.account.get_message_by_id(ev.data2)
|
|
|
|
def wait_next_messages_changed(self):
|
|
"""wait for and return next message-changed message or None
|
|
if the event contains no msgid
|
|
"""
|
|
ev = self.get_matching("DC_EVENT_MSGS_CHANGED")
|
|
if ev.data2 > 0:
|
|
return self.account.get_message_by_id(ev.data2)
|
|
return None
|
|
|
|
def wait_next_reactions_changed(self):
|
|
"""wait for and return next reactions-changed message."""
|
|
ev = self.get_matching("DC_EVENT_REACTIONS_CHANGED")
|
|
assert ev.data1 > 0
|
|
return self.account.get_message_by_id(ev.data2)
|
|
|
|
def wait_msg_delivered(self, msg):
|
|
ev = self.get_matching("DC_EVENT_MSG_DELIVERED")
|
|
assert ev.data1 == msg.chat.id
|
|
assert ev.data2 == msg.id
|
|
assert msg.is_out_delivered()
|
|
|
|
|
|
class EventThread(threading.Thread):
|
|
"""Event Thread for an account.
|
|
|
|
With each Account init this callback thread is started.
|
|
"""
|
|
|
|
def __init__(self, account: Account) -> None:
|
|
self.account = account
|
|
super(EventThread, self).__init__(name="events")
|
|
self.daemon = True
|
|
self._marked_for_shutdown = False
|
|
self.start()
|
|
|
|
@contextmanager
|
|
def log_execution(self, message):
|
|
self.account.log(message + " START")
|
|
yield
|
|
self.account.log(message + " FINISHED")
|
|
|
|
def mark_shutdown(self) -> None:
|
|
self._marked_for_shutdown = True
|
|
|
|
def wait(self, timeout=None) -> None:
|
|
if self == threading.current_thread():
|
|
# we are in the callback thread and thus cannot
|
|
# wait for the thread-loop to finish.
|
|
return
|
|
self.join(timeout=timeout)
|
|
|
|
def run(self) -> None:
|
|
"""get and run events until shutdown."""
|
|
with self.log_execution("EVENT THREAD"):
|
|
event_emitter = ffi.gc(
|
|
lib.dc_get_event_emitter(self.account._dc_context),
|
|
lib.dc_event_emitter_unref,
|
|
)
|
|
while not self._marked_for_shutdown:
|
|
with self.swallow_and_log_exception("Unexpected error in event thread"):
|
|
event = lib.dc_get_next_event(event_emitter)
|
|
if event == ffi.NULL or self._marked_for_shutdown:
|
|
break
|
|
self._process_event(event)
|
|
|
|
def _process_event(self, event) -> None:
|
|
evt = lib.dc_event_get_id(event)
|
|
data1 = lib.dc_event_get_data1_int(event)
|
|
# the following code relates to the deltachat/_build.py's helper
|
|
# function which provides us signature info of an event call
|
|
evt_name = get_dc_event_name(evt)
|
|
if lib.dc_event_has_string_data(evt):
|
|
data2 = from_optional_dc_charpointer(lib.dc_event_get_data2_str(event))
|
|
else:
|
|
data2 = lib.dc_event_get_data2_int(event)
|
|
|
|
lib.dc_event_unref(event)
|
|
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
|
|
with self.swallow_and_log_exception(f"ac_process_ffi_event {ffi_event}"):
|
|
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
|
|
for name, kwargs in self._map_ffi_event(ffi_event):
|
|
hook = getattr(self.account._pm.hook, name)
|
|
info = f"call {name} kwargs={kwargs} failed"
|
|
with self.swallow_and_log_exception(info):
|
|
hook(**kwargs)
|
|
|
|
@contextmanager
|
|
def swallow_and_log_exception(self, info):
|
|
try:
|
|
yield
|
|
except Exception as ex:
|
|
logfile = io.StringIO()
|
|
traceback.print_exception(*sys.exc_info(), file=logfile)
|
|
self.account.log(f"{info}\nException {ex}\nTraceback:\n{logfile.getvalue()}")
|
|
|
|
def _map_ffi_event(self, ffi_event: FFIEvent):
|
|
name = ffi_event.name
|
|
account = self.account
|
|
if name == "DC_EVENT_CONFIGURE_PROGRESS":
|
|
data1 = ffi_event.data1
|
|
if data1 == 0 or data1 == 1000:
|
|
success = data1 == 1000
|
|
comment = ffi_event.data2
|
|
yield "ac_configure_completed", {"success": success, "comment": comment}
|
|
elif name == "DC_EVENT_INCOMING_MSG":
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
if msg is not None:
|
|
yield map_system_message(msg) or ("ac_incoming_message", {"message": msg})
|
|
elif name == "DC_EVENT_MSGS_CHANGED":
|
|
if ffi_event.data2 != 0:
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
if msg is not None:
|
|
if msg.is_outgoing():
|
|
res = map_system_message(msg)
|
|
if res and res[0].startswith("ac_member"):
|
|
yield res
|
|
yield "ac_outgoing_message", {"message": msg}
|
|
elif msg.is_in_fresh():
|
|
yield map_system_message(msg) or (
|
|
"ac_incoming_message",
|
|
{"message": msg},
|
|
)
|
|
elif name == "DC_EVENT_REACTIONS_CHANGED":
|
|
assert ffi_event.data1 > 0
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
yield "ac_reactions_changed", {"message": msg}
|
|
elif name == "DC_EVENT_MSG_DELIVERED":
|
|
msg = account.get_message_by_id(ffi_event.data2)
|
|
yield "ac_message_delivered", {"message": msg}
|
|
elif name == "DC_EVENT_CHAT_MODIFIED":
|
|
chat = account.get_chat_by_id(ffi_event.data1)
|
|
yield "ac_chat_modified", {"chat": chat}
|