fix test for events

This commit is contained in:
holger krekel
2019-07-15 23:31:30 +02:00
parent 9ad4c9a6fe
commit e892c5cf4d
3 changed files with 19 additions and 12 deletions

View File

@@ -44,6 +44,9 @@ def py_dc_callback(ctx, evt, data1, data2):
try:
ret = callback(ctx, evt_name, data1, data2)
if ret is None:
ret = 0
assert isinstance(ret, int), repr(res)
if event_sig_types & 4:
return ffi.cast('uintptr_t', ret)
elif event_sig_types & 8:

View File

@@ -45,6 +45,9 @@ class Account(object):
self._configkeys = self.get_config("sys.config_keys").split()
self._imex_completed = threading.Event()
def __del__(self):
lib.dc_close(self._dc_context)
def _check_config_key(self, name):
if name not in self._configkeys:
raise KeyError("{!r} not a valid config key, existing keys: {!r}".format(
@@ -414,7 +417,7 @@ class EventLogger:
raise ValueError("{}({!r},{!r})".format(*ev))
return ev
def get_matching(self, event_name_regex):
def get_matching(self, event_name_regex, check_error=True):
self._log("-- waiting for event with regex: {} --".format(event_name_regex))
rex = re.compile("(?:{}).*".format(event_name_regex))
while 1:

View File

@@ -1,25 +1,26 @@
from __future__ import print_function
import pytest
from deltachat import capi, Account, const, set_context_callback
from deltachat.capi import ffi
from deltachat.cutil import as_dc_charpointer
from queue import Queue
from deltachat.account import EventLogger
def test_empty_context():
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL, capi.ffi.NULL)
capi.lib.dc_close(ctx)
def test_set_context():
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL, capi.ffi.NULL)
q = Queue()
set_context_callback(ctx, lambda *args: q.put(args))
name = as_dc_charpointer("ein")
email = as_dc_charpointer("ein@kontakt.org")
contact_id = capi.lib.dc_create_contact(ctx, name, email)
def test_dc_close_events():
ctx = capi.lib.dc_context_new(capi.lib.py_dc_callback, ffi.NULL, ffi.NULL)
evlog = EventLogger(ctx)
evlog.set_timeout(5)
set_context_callback(ctx, lambda ctx, evt_name, data1, data2: evlog(evt_name, data1, data2))
capi.lib.dc_close(ctx)
q.get(timeout=10)
# test that we get events from dc_close
print(evlog.get_matching("DC_EVENT_INFO", check_error=False))
print(evlog.get_matching("DC_EVENT_INFO", check_error=False))
print(evlog.get_matching("DC_EVENT_INFO", check_error=False))
print(evlog.get_matching("DC_EVENT_INFO", check_error=False))
def test_wrong_db(tmpdir):