mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 23:22:11 +03:00
This adopts the FFI API to use the new Context::new() and Contest::drop() Rust-style APIs while preserving the semantics of the existing FFI API. It introduces a wrapper around the context which keeps an optional reference to the actual context, changing dc_open() and dc_close() to create/destroy the actual reference. This changes the events emitted on closing slightly: they only get emitted when dc_close() was called while the context was open. While dc_close() remains idempotent in that it can still be called safely multiple times, the close events will only occur if the context is actually closed.
86 lines
2.7 KiB
Python
86 lines
2.7 KiB
Python
from __future__ import print_function
|
|
from deltachat import capi, const, set_context_callback, clear_context_callback
|
|
from deltachat.capi import ffi
|
|
from deltachat.capi import lib
|
|
from deltachat.account import EventLogger
|
|
|
|
|
|
def test_empty_context():
|
|
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL, capi.ffi.NULL)
|
|
capi.lib.dc_close(ctx)
|
|
|
|
|
|
def test_callback_None2int():
|
|
ctx = capi.lib.dc_context_new(capi.lib.py_dc_callback, ffi.NULL, ffi.NULL)
|
|
set_context_callback(ctx, lambda *args: None)
|
|
capi.lib.dc_close(ctx)
|
|
clear_context_callback(ctx)
|
|
|
|
|
|
def test_dc_close_events(tmpdir):
|
|
ctx = ffi.gc(
|
|
capi.lib.dc_context_new(capi.lib.py_dc_callback, ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
evlog = EventLogger(ctx)
|
|
evlog.set_timeout(5)
|
|
set_context_callback(
|
|
ctx,
|
|
lambda ctx, evt_name, data1, data2: evlog(evt_name, data1, data2)
|
|
)
|
|
p = tmpdir.join("hello.db")
|
|
lib.dc_open(ctx, p.strpath.encode("ascii"), ffi.NULL)
|
|
capi.lib.dc_close(ctx)
|
|
|
|
def find(info_string):
|
|
while 1:
|
|
ev = evlog.get_matching("DC_EVENT_INFO", check_error=False)
|
|
data2 = ev[2]
|
|
if info_string in data2:
|
|
return
|
|
else:
|
|
print("skipping event", *ev)
|
|
|
|
find("disconnecting INBOX-watch")
|
|
find("disconnecting sentbox-thread")
|
|
find("disconnecting mvbox-thread")
|
|
find("disconnecting SMTP")
|
|
find("Database closed")
|
|
|
|
|
|
def test_wrong_db(tmpdir):
|
|
dc_context = ffi.gc(
|
|
lib.dc_context_new(lib.py_dc_callback, ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
p = tmpdir.join("hello.db")
|
|
# write an invalid database file
|
|
p.write("x123" * 10)
|
|
assert not lib.dc_open(dc_context, p.strpath.encode("ascii"), ffi.NULL)
|
|
|
|
|
|
def test_event_defines():
|
|
assert const.DC_EVENT_INFO == 100
|
|
assert const.DC_CONTACT_ID_SELF
|
|
|
|
|
|
def test_sig():
|
|
sig = capi.lib.dc_get_event_signature_types
|
|
assert sig(const.DC_EVENT_INFO) == 2
|
|
assert sig(const.DC_EVENT_WARNING) == 2
|
|
assert sig(const.DC_EVENT_ERROR) == 2
|
|
assert sig(const.DC_EVENT_SMTP_CONNECTED) == 2
|
|
assert sig(const.DC_EVENT_IMAP_CONNECTED) == 2
|
|
assert sig(const.DC_EVENT_SMTP_MESSAGE_SENT) == 2
|
|
|
|
|
|
def test_markseen_invalid_message_ids(acfactory):
|
|
ac1 = acfactory.get_configured_offline_account()
|
|
contact1 = ac1.create_contact(email="some1@example.com", name="some1")
|
|
chat = ac1.create_chat_by_contact(contact1)
|
|
chat.send_text("one messae")
|
|
ac1._evlogger.get_matching("DC_EVENT_MSGS_CHANGED")
|
|
msg_ids = [9]
|
|
lib.dc_markseen_msgs(ac1._dc_context, msg_ids, len(msg_ids))
|
|
ac1._evlogger.ensure_event_not_queued("DC_EVENT_WARNING|DC_EVENT_ERROR")
|