Files
chatmail-core/python/tests/test_4_lowlevel.py
iequidoo 1f336f89a6 feat: Sync Config::Displayname across devices (#4893)
We already synchronise status/footer when we see a self-sent message with a Chat-Version
header. Would be nice to do the same for display name.

But let's do it the same way as for `Config::{MdnsEnabled,ShowEmails}`. Otherwise, if we sync the
display name using the "From" header, smth like `Param::StatusTimestamp` is needed then to reject
outdated display names. Also this timestamp needs to be updated when `Config::Displayname` is set
locally. Also this wouldn't work if system time isn't synchronised on devices. Also using multiple
approaches to sync different config values would lead to more code and bugs while having almost no
value -- using "From" only saves some bytes and allows to sync some things w/o the synchronisation
itself to be enabled. But the latter also can be a downside -- if it's usual synchonisation, you can
(potentially) disable it and share the same email account across people in some organisation
allowing them to have different display names. With using "From" for synchronisation such a
capability definitely requires a new config option.
2023-12-01 21:41:58 -03:00

256 lines
8.7 KiB
Python

import json
from queue import Queue
import deltachat as dc
from deltachat import capi, cutil, register_global_plugin
from deltachat.capi import ffi, lib
from deltachat.hookspec import global_hookimpl
from deltachat.testplugin import (
ACSetup,
create_dict_from_files_in_path,
write_dict_to_dir,
)
from deltachat.cutil import from_optional_dc_charpointer
# from deltachat.account import EventLogger
class TestACSetup:
def test_cache_writing(self, tmp_path):
base = tmp_path.joinpath("hello")
base.mkdir()
d1 = base.joinpath("dir1")
d1.mkdir()
d1.joinpath("file1").write_bytes(b"content1")
d2 = d1.joinpath("dir2")
d2.mkdir()
d2.joinpath("file2").write_bytes(b"123")
d = create_dict_from_files_in_path(base)
newbase = tmp_path.joinpath("other")
write_dict_to_dir(d, newbase)
assert newbase.joinpath("dir1", "dir2", "file2").exists()
assert newbase.joinpath("dir1", "file1").exists()
def test_basic_states(self, acfactory, monkeypatch, testprocess):
pc = ACSetup(init_time=0.0, testprocess=testprocess)
acc = acfactory.get_unconfigured_account()
monkeypatch.setattr(acc, "configure", lambda **kwargs: None)
pc.start_configure(acc)
assert pc._account2state[acc] == pc.CONFIGURING
pc._configured_events.put((acc, True, None))
monkeypatch.setattr(pc, "init_imap", lambda *args, **kwargs: None)
pc.wait_one_configured(acc)
assert pc._account2state[acc] == pc.CONFIGURED
monkeypatch.setattr(pc, "_onconfigure_start_io", lambda *args, **kwargs: None)
pc.bring_online()
assert pc._account2state[acc] == pc.IDLEREADY
def test_two_accounts_one_waited_all_started(self, monkeypatch, acfactory, testprocess):
pc = ACSetup(init_time=0.0, testprocess=testprocess)
monkeypatch.setattr(pc, "init_imap", lambda *args, **kwargs: None)
monkeypatch.setattr(pc, "_onconfigure_start_io", lambda *args, **kwargs: None)
ac1 = acfactory.get_unconfigured_account()
monkeypatch.setattr(ac1, "configure", lambda **kwargs: None)
pc.start_configure(ac1)
ac2 = acfactory.get_unconfigured_account()
monkeypatch.setattr(ac2, "configure", lambda **kwargs: None)
pc.start_configure(ac2)
assert pc._account2state[ac1] == pc.CONFIGURING
assert pc._account2state[ac2] == pc.CONFIGURING
pc._configured_events.put((ac1, True, None))
pc.wait_one_configured(ac1)
assert pc._account2state[ac1] == pc.CONFIGURED
assert pc._account2state[ac2] == pc.CONFIGURING
pc._configured_events.put((ac2, True, None))
pc.bring_online()
assert pc._account2state[ac1] == pc.IDLEREADY
assert pc._account2state[ac2] == pc.IDLEREADY
def test_store_and_retrieve_configured_account_cache(self, acfactory, tmp_path):
ac1 = acfactory.get_pseudo_configured_account()
holder = acfactory._acsetup.testprocess
assert holder.cache_maybe_store_configured_db_files(ac1)
assert not holder.cache_maybe_store_configured_db_files(ac1)
acdir = tmp_path / "newaccount"
acdir.mkdir()
addr = ac1.get_config("addr")
target_db_path = acdir / "db"
assert holder.cache_maybe_retrieve_configured_db_files(addr, str(target_db_path))
assert sum(1 for _ in acdir.iterdir()) >= 2
def test_liveconfig_caching(acfactory, monkeypatch):
prod = [
{"addr": "1@example.org", "mail_pw": "123"},
]
acfactory._liveconfig_producer = iter(prod)
d1 = acfactory.get_next_liveconfig()
d1["hello"] = "world"
acfactory._liveconfig_producer = iter(prod)
d2 = acfactory.get_next_liveconfig()
assert "hello" not in d2
def test_empty_context():
ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL, capi.ffi.NULL)
capi.lib.dc_context_unref(ctx)
def test_dc_close_events(acfactory):
ac1 = acfactory.get_unconfigured_account()
# register after_shutdown function
shutdowns = Queue()
class ShutdownPlugin:
@global_hookimpl
def dc_account_after_shutdown(self, account):
assert account._dc_context is None
shutdowns.put(account)
register_global_plugin(ShutdownPlugin())
assert hasattr(ac1, "_dc_context")
ac1.shutdown()
shutdowns.get(timeout=2)
def test_wrong_db(tmp_path):
p = tmp_path / "hello.db"
# write an invalid database file
p.write_bytes(b"x123" * 10)
context = lib.dc_context_new(ffi.NULL, str(p).encode("ascii"), ffi.NULL)
assert not lib.dc_context_is_open(context)
def test_empty_blobdir(tmp_path):
db_fname = tmp_path / "hello.db"
# Apparently some client code expects this to be the same as passing NULL.
ctx = ffi.gc(
lib.dc_context_new(ffi.NULL, str(db_fname).encode("ascii"), b""),
lib.dc_context_unref,
)
assert ctx != ffi.NULL
def test_event_defines():
assert dc.const.DC_EVENT_INFO == 100
assert dc.const.DC_CONTACT_ID_SELF
def test_sig():
sig = capi.lib.dc_event_has_string_data
assert not sig(dc.const.DC_EVENT_MSGS_CHANGED)
assert sig(dc.const.DC_EVENT_INFO)
assert sig(dc.const.DC_EVENT_WARNING)
assert sig(dc.const.DC_EVENT_ERROR)
assert sig(dc.const.DC_EVENT_SMTP_CONNECTED)
assert sig(dc.const.DC_EVENT_IMAP_CONNECTED)
assert sig(dc.const.DC_EVENT_SMTP_MESSAGE_SENT)
assert sig(dc.const.DC_EVENT_IMEX_FILE_WRITTEN)
def test_markseen_invalid_message_ids(acfactory):
ac1 = acfactory.get_pseudo_configured_account()
contact1 = ac1.create_contact("some1@example.com", name="some1")
chat = contact1.create_chat()
chat.send_text("one message")
ac1._evtracker.get_matching("DC_EVENT_MSGS_CHANGED")
# Skip configuration-related warnings, but not errors.
ac1._evtracker.ensure_event_not_queued("DC_EVENT_ERROR")
msg_ids = [9]
lib.dc_markseen_msgs(ac1._dc_context, msg_ids, len(msg_ids))
ac1._evtracker.ensure_event_not_queued("DC_EVENT_WARNING|DC_EVENT_ERROR")
def test_get_special_message_id_returns_empty_message(acfactory):
ac1 = acfactory.get_pseudo_configured_account()
for i in range(1, 10):
msg = ac1.get_message_by_id(i)
assert msg.id == 0
def test_provider_info_none():
ctx = ffi.gc(
lib.dc_context_new(ffi.NULL, ffi.NULL, ffi.NULL),
lib.dc_context_unref,
)
assert lib.dc_provider_new_from_email(ctx, cutil.as_dc_charpointer("email@unexistent.no")) == ffi.NULL
def test_get_info_open(tmp_path):
db_fname = tmp_path / "test.db"
ctx = ffi.gc(
lib.dc_context_new(ffi.NULL, str(db_fname).encode("ascii"), ffi.NULL),
lib.dc_context_unref,
)
info = cutil.from_dc_charpointer(lib.dc_get_info(ctx))
assert "deltachat_core_version" in info
assert "database_dir" in info
def test_logged_hook_failure(acfactory):
ac1 = acfactory.get_pseudo_configured_account()
cap = []
ac1.log = cap.append
with ac1._event_thread.swallow_and_log_exception("some"):
0 / 0
assert cap
assert "some" in str(cap)
assert "ZeroDivisionError" in str(cap)
assert "Traceback" in str(cap)
def test_logged_ac_process_ffi_failure(acfactory):
from deltachat import account_hookimpl
ac1 = acfactory.get_pseudo_configured_account()
class FailPlugin:
@account_hookimpl
def ac_process_ffi_event(ffi_event):
0 / 0
cap = Queue()
ac1.log = cap.put
ac1.add_account_plugin(FailPlugin())
# cause any event eg contact added/changed
ac1.create_contact("something@example.org")
res = cap.get(timeout=10)
assert "ac_process_ffi_event" in res
assert "ZeroDivisionError" in res
assert "Traceback" in res
def test_jsonrpc_blocking_call(tmp_path):
accounts_fname = tmp_path / "accounts"
writable = True
accounts = ffi.gc(
lib.dc_accounts_new(str(accounts_fname).encode("ascii"), writable),
lib.dc_accounts_unref,
)
jsonrpc = ffi.gc(lib.dc_jsonrpc_init(accounts), lib.dc_jsonrpc_unref)
res = json.loads(
from_optional_dc_charpointer(
lib.dc_jsonrpc_blocking_call(
jsonrpc,
json.dumps(
{"jsonrpc": "2.0", "method": "check_email_validity", "params": ["alice@example.org"], "id": "123"},
).encode("utf-8"),
),
),
)
assert res == {"jsonrpc": "2.0", "id": "123", "result": True}
res = json.loads(
from_optional_dc_charpointer(
lib.dc_jsonrpc_blocking_call(
jsonrpc,
json.dumps(
{"jsonrpc": "2.0", "method": "check_email_validity", "params": ["alice"], "id": "456"},
).encode("utf-8"),
),
),
)
assert res == {"jsonrpc": "2.0", "id": "456", "result": False}