mirror of
https://github.com/chatmail/core.git
synced 2026-04-06 07:32:12 +03:00
198 lines
5.8 KiB
Python
198 lines
5.8 KiB
Python
from __future__ import print_function
|
|
import threading
|
|
import time
|
|
from deltachat import capi, cutil, const, set_context_callback, clear_context_callback, py_dc_callback
|
|
from deltachat.capi import ffi
|
|
from deltachat.capi import lib
|
|
# from deltachat.account import EventLogger
|
|
|
|
|
|
class EventThread(threading.Thread):
|
|
def __init__(self, dc_context):
|
|
self.dc_context = dc_context
|
|
super(EventThread, self).__init__()
|
|
self.setDaemon(1)
|
|
self._running = True
|
|
|
|
def run(self):
|
|
lib.dc_context_run(self.dc_context)
|
|
while self._running:
|
|
if lib.dc_has_next_event(self.dc_context):
|
|
event = lib.dc_get_next_event(self.dc_context)
|
|
if event != ffi.NULL:
|
|
py_dc_callback(
|
|
self._dc_context,
|
|
lib.dc_event_get_id(event),
|
|
lib.dc_event_get_data1(event),
|
|
lib.dc_event_get_data2(event)
|
|
)
|
|
lib.dc_event_unref(event)
|
|
else:
|
|
time.sleep(0.05)
|
|
|
|
def stop(self):
|
|
lib.dc_context_shutdown(self.dc_context)
|
|
self._running = False
|
|
|
|
|
|
def test_empty_context():
|
|
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL)
|
|
capi.lib.dc_close(ctx)
|
|
|
|
|
|
def test_callback_None2int():
|
|
ctx = capi.lib.dc_context_new(ffi.NULL, ffi.NULL)
|
|
set_context_callback(ctx, lambda *args: None)
|
|
capi.lib.dc_close(ctx)
|
|
clear_context_callback(ctx)
|
|
|
|
|
|
def test_start_stop_event_thread_basic():
|
|
print("1")
|
|
ctx = capi.lib.dc_context_new(ffi.NULL, ffi.NULL)
|
|
print("2")
|
|
ev_thread = EventThread(ctx)
|
|
print("3 -- starting event thread")
|
|
ev_thread.start()
|
|
print("4 -- stopping event thread")
|
|
ev_thread.stop()
|
|
|
|
|
|
# FIXME: EventLogger doesn't work without an account anymore
|
|
# def test_dc_close_events(tmpdir):
|
|
# ctx = ffi.gc(
|
|
# capi.lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
# lib.dc_context_unref,
|
|
# )
|
|
# evlog = EventLogger(ctx)
|
|
# evlog.set_timeout(5)
|
|
# set_context_callback(
|
|
# ctx,
|
|
# lambda ctx, evt_name, data1, data2: evlog(evt_name, data1, data2)
|
|
# )
|
|
# ev_thread = EventThread(ctx)
|
|
# ev_thread.start()
|
|
|
|
# p = tmpdir.join("hello.db")
|
|
# lib.dc_open(ctx, p.strpath.encode("ascii"), ffi.NULL)
|
|
# capi.lib.dc_close(ctx)
|
|
|
|
# def find(info_string):
|
|
# while 1:
|
|
# ev = evlog.get_matching("DC_EVENT_INFO", check_error=False)
|
|
# data2 = ev[2]
|
|
# if info_string in data2:
|
|
# return
|
|
# else:
|
|
# print("skipping event", *ev)
|
|
|
|
# find("disconnecting inbox-thread")
|
|
# find("disconnecting sentbox-thread")
|
|
# find("disconnecting mvbox-thread")
|
|
# find("disconnecting SMTP")
|
|
# find("Database closed")
|
|
# ev_thread.stop()
|
|
|
|
|
|
def test_wrong_db(tmpdir):
|
|
dc_context = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
p = tmpdir.join("hello.db")
|
|
# write an invalid database file
|
|
p.write("x123" * 10)
|
|
assert not lib.dc_open(dc_context, p.strpath.encode("ascii"), ffi.NULL)
|
|
|
|
|
|
def test_empty_blobdir(tmpdir):
|
|
# Apparently some client code expects this to be the same as passing NULL.
|
|
ctx = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
db_fname = tmpdir.join("hello.db")
|
|
assert lib.dc_open(ctx, db_fname.strpath.encode("ascii"), b"")
|
|
|
|
|
|
def test_event_defines():
|
|
assert const.DC_EVENT_INFO == 100
|
|
assert const.DC_CONTACT_ID_SELF
|
|
|
|
|
|
def test_sig():
|
|
sig = capi.lib.dc_get_event_signature_types
|
|
assert sig(const.DC_EVENT_INFO) == 2
|
|
assert sig(const.DC_EVENT_WARNING) == 2
|
|
assert sig(const.DC_EVENT_ERROR) == 2
|
|
assert sig(const.DC_EVENT_SMTP_CONNECTED) == 2
|
|
assert sig(const.DC_EVENT_IMAP_CONNECTED) == 2
|
|
assert sig(const.DC_EVENT_SMTP_MESSAGE_SENT) == 2
|
|
|
|
|
|
def test_markseen_invalid_message_ids(acfactory):
|
|
ac1 = acfactory.get_configured_offline_account()
|
|
|
|
contact1 = ac1.create_contact(email="some1@example.com", name="some1")
|
|
chat = ac1.create_chat_by_contact(contact1)
|
|
chat.send_text("one messae")
|
|
ac1._evlogger.get_matching("DC_EVENT_MSGS_CHANGED")
|
|
msg_ids = [9]
|
|
lib.dc_markseen_msgs(ac1._dc_context, msg_ids, len(msg_ids))
|
|
ac1._evlogger.ensure_event_not_queued("DC_EVENT_WARNING|DC_EVENT_ERROR")
|
|
|
|
|
|
def test_get_special_message_id_returns_empty_message(acfactory):
|
|
ac1 = acfactory.get_configured_offline_account()
|
|
for i in range(1, 10):
|
|
msg = ac1.get_message_by_id(i)
|
|
assert msg.id == 0
|
|
|
|
|
|
def test_provider_info_none():
|
|
ctx = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
assert lib.dc_provider_new_from_email(ctx, cutil.as_dc_charpointer("email@unexistent.no")) == ffi.NULL
|
|
|
|
|
|
def test_get_info_closed():
|
|
ctx = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
info = cutil.from_dc_charpointer(lib.dc_get_info(ctx))
|
|
assert 'deltachat_core_version' in info
|
|
assert 'database_dir' not in info
|
|
|
|
|
|
def test_get_info_open(tmpdir):
|
|
ctx = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
db_fname = tmpdir.join("test.db")
|
|
lib.dc_open(ctx, db_fname.strpath.encode("ascii"), ffi.NULL)
|
|
info = cutil.from_dc_charpointer(lib.dc_get_info(ctx))
|
|
assert 'deltachat_core_version' in info
|
|
assert 'database_dir' in info
|
|
|
|
|
|
def test_is_open_closed():
|
|
ctx = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
assert lib.dc_is_open(ctx) == 0
|
|
|
|
|
|
def test_is_open_actually_open(tmpdir):
|
|
ctx = ffi.gc(
|
|
lib.dc_context_new(ffi.NULL, ffi.NULL),
|
|
lib.dc_context_unref,
|
|
)
|
|
db_fname = tmpdir.join("test.db")
|
|
lib.dc_open(ctx, db_fname.strpath.encode("ascii"), ffi.NULL)
|
|
assert lib.dc_is_open(ctx) == 1
|