mirror of
https://github.com/chatmail/core.git
synced 2026-05-20 07:16:31 +03:00
test(python): replace legacy tmpdir fixture with tmp_path
This commit is contained in:
@@ -8,7 +8,7 @@ import pytest
|
|||||||
import deltachat
|
import deltachat
|
||||||
|
|
||||||
|
|
||||||
def test_db_busy_error(acfactory, tmpdir):
|
def test_db_busy_error(acfactory):
|
||||||
starttime = time.time()
|
starttime = time.time()
|
||||||
log_lock = threading.RLock()
|
log_lock = threading.RLock()
|
||||||
|
|
||||||
|
|||||||
@@ -494,7 +494,7 @@ def test_multidevice_sync_seen(acfactory, lp):
|
|||||||
assert "Expires: " in ac1_clone_message.get_message_info()
|
assert "Expires: " in ac1_clone_message.get_message_info()
|
||||||
|
|
||||||
|
|
||||||
def test_see_new_verified_member_after_going_online(acfactory, tmpdir, lp):
|
def test_see_new_verified_member_after_going_online(acfactory, tmp_path, lp):
|
||||||
"""The test for the bug #3836:
|
"""The test for the bug #3836:
|
||||||
- Alice has two devices, the second is offline.
|
- Alice has two devices, the second is offline.
|
||||||
- Alice creates a verified group and sends a QR invitation to Bob.
|
- Alice creates a verified group and sends a QR invitation to Bob.
|
||||||
@@ -507,9 +507,10 @@ def test_see_new_verified_member_after_going_online(acfactory, tmpdir, lp):
|
|||||||
for ac in [ac1, ac1_offl]:
|
for ac in [ac1, ac1_offl]:
|
||||||
ac.set_config("bcc_self", "1")
|
ac.set_config("bcc_self", "1")
|
||||||
acfactory.bring_accounts_online()
|
acfactory.bring_accounts_online()
|
||||||
dir = tmpdir.mkdir("exportdir")
|
dir = tmp_path / "exportdir"
|
||||||
ac1.export_self_keys(dir.strpath)
|
dir.mkdir()
|
||||||
ac1_offl.import_self_keys(dir.strpath)
|
ac1.export_self_keys(str(dir))
|
||||||
|
ac1_offl.import_self_keys(str(dir))
|
||||||
ac1_offl.stop_io()
|
ac1_offl.stop_io()
|
||||||
|
|
||||||
lp.sec("ac1: create verified-group QR, ac2 scans and joins")
|
lp.sec("ac1: create verified-group QR, ac2 scans and joins")
|
||||||
@@ -541,7 +542,7 @@ def test_see_new_verified_member_after_going_online(acfactory, tmpdir, lp):
|
|||||||
ac1.set_config("bcc_self", "0")
|
ac1.set_config("bcc_self", "0")
|
||||||
|
|
||||||
|
|
||||||
def test_use_new_verified_group_after_going_online(acfactory, tmpdir, lp):
|
def test_use_new_verified_group_after_going_online(acfactory, tmp_path, lp):
|
||||||
"""Another test for the bug #3836:
|
"""Another test for the bug #3836:
|
||||||
- Bob has two devices, the second is offline.
|
- Bob has two devices, the second is offline.
|
||||||
- Alice creates a verified group and sends a QR invitation to Bob.
|
- Alice creates a verified group and sends a QR invitation to Bob.
|
||||||
@@ -556,9 +557,10 @@ def test_use_new_verified_group_after_going_online(acfactory, tmpdir, lp):
|
|||||||
for ac in [ac2, ac2_offl]:
|
for ac in [ac2, ac2_offl]:
|
||||||
ac.set_config("bcc_self", "1")
|
ac.set_config("bcc_self", "1")
|
||||||
acfactory.bring_accounts_online()
|
acfactory.bring_accounts_online()
|
||||||
dir = tmpdir.mkdir("exportdir")
|
dir = tmp_path / "exportdir"
|
||||||
ac2.export_self_keys(dir.strpath)
|
dir.mkdir()
|
||||||
ac2_offl.import_self_keys(dir.strpath)
|
ac2.export_self_keys(str(dir))
|
||||||
|
ac2_offl.import_self_keys(str(dir))
|
||||||
ac2_offl.stop_io()
|
ac2_offl.stop_io()
|
||||||
|
|
||||||
lp.sec("ac1: create verified-group QR, ac2 scans and joins")
|
lp.sec("ac1: create verified-group QR, ac2 scans and joins")
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from deltachat.message import Message
|
|||||||
from deltachat.tracker import ImexTracker
|
from deltachat.tracker import ImexTracker
|
||||||
|
|
||||||
|
|
||||||
def test_basic_imap_api(acfactory, tmpdir):
|
def test_basic_imap_api(acfactory, tmp_path):
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
chat12 = acfactory.get_accepted_chat(ac1, ac2)
|
chat12 = acfactory.get_accepted_chat(ac1, ac2)
|
||||||
|
|
||||||
@@ -28,7 +28,7 @@ def test_basic_imap_api(acfactory, tmpdir):
|
|||||||
imap2.mark_all_read()
|
imap2.mark_all_read()
|
||||||
assert imap2.get_unread_cnt() == 0
|
assert imap2.get_unread_cnt() == 0
|
||||||
|
|
||||||
imap2.dump_imap_structures(tmpdir, logfile=sys.stdout)
|
imap2.dump_imap_structures(tmp_path, logfile=sys.stdout)
|
||||||
imap2.shutdown()
|
imap2.shutdown()
|
||||||
|
|
||||||
|
|
||||||
@@ -72,35 +72,37 @@ def test_configure_canceled(acfactory):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def test_configure_unref(tmpdir):
|
def test_configure_unref(tmp_path):
|
||||||
"""Test that removing the last reference to the context during ongoing configuration
|
"""Test that removing the last reference to the context during ongoing configuration
|
||||||
does not result in use-after-free."""
|
does not result in use-after-free."""
|
||||||
from deltachat.capi import ffi, lib
|
from deltachat.capi import ffi, lib
|
||||||
|
|
||||||
path = tmpdir.mkdir("test_configure_unref").join("dc.db").strpath
|
path = tmp_path / "test_configure_unref"
|
||||||
dc_context = lib.dc_context_new(ffi.NULL, path.encode("utf8"), ffi.NULL)
|
path.mkdir()
|
||||||
|
dc_context = lib.dc_context_new(ffi.NULL, str(path / "dc.db").encode("utf8"), ffi.NULL)
|
||||||
lib.dc_set_config(dc_context, "addr".encode("utf8"), "foo@x.testrun.org".encode("utf8"))
|
lib.dc_set_config(dc_context, "addr".encode("utf8"), "foo@x.testrun.org".encode("utf8"))
|
||||||
lib.dc_set_config(dc_context, "mail_pw".encode("utf8"), "abc".encode("utf8"))
|
lib.dc_set_config(dc_context, "mail_pw".encode("utf8"), "abc".encode("utf8"))
|
||||||
lib.dc_configure(dc_context)
|
lib.dc_configure(dc_context)
|
||||||
lib.dc_context_unref(dc_context)
|
lib.dc_context_unref(dc_context)
|
||||||
|
|
||||||
|
|
||||||
def test_export_import_self_keys(acfactory, tmpdir, lp):
|
def test_export_import_self_keys(acfactory, tmp_path, lp):
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
|
|
||||||
dir = tmpdir.mkdir("exportdir")
|
dir = tmp_path / "exportdir"
|
||||||
export_files = ac1.export_self_keys(dir.strpath)
|
dir.mkdir()
|
||||||
|
export_files = ac1.export_self_keys(str(dir))
|
||||||
assert len(export_files) == 2
|
assert len(export_files) == 2
|
||||||
for x in export_files:
|
for x in export_files:
|
||||||
assert x.startswith(dir.strpath)
|
assert x.startswith(str(dir))
|
||||||
(key_id,) = ac1._evtracker.get_info_regex_groups(r".*xporting.*KeyId\((.*)\).*")
|
(key_id,) = ac1._evtracker.get_info_regex_groups(r".*xporting.*KeyId\((.*)\).*")
|
||||||
ac1._evtracker.consume_events()
|
ac1._evtracker.consume_events()
|
||||||
|
|
||||||
lp.sec("exported keys (private and public)")
|
lp.sec("exported keys (private and public)")
|
||||||
for name in os.listdir(dir.strpath):
|
for name in dir.iterdir():
|
||||||
lp.indent(dir.strpath + os.sep + name)
|
lp.indent(str(dir / name))
|
||||||
lp.sec("importing into existing account")
|
lp.sec("importing into existing account")
|
||||||
ac2.import_self_keys(dir.strpath)
|
ac2.import_self_keys(str(dir))
|
||||||
(key_id2,) = ac2._evtracker.get_info_regex_groups(r".*stored.*KeyId\((.*)\).*")
|
(key_id2,) = ac2._evtracker.get_info_regex_groups(r".*stored.*KeyId\((.*)\).*")
|
||||||
assert key_id2 == key_id
|
assert key_id2 == key_id
|
||||||
|
|
||||||
@@ -156,20 +158,19 @@ def test_one_account_send_bcc_setting(acfactory, lp):
|
|||||||
assert len(list(ac1.direct_imap.conn.fetch(AND(seen=True)))) == 1
|
assert len(list(ac1.direct_imap.conn.fetch(AND(seen=True)))) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_send_file_twice_unicode_filename_mangling(tmpdir, acfactory, lp):
|
def test_send_file_twice_unicode_filename_mangling(tmp_path, acfactory, lp):
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
chat = acfactory.get_accepted_chat(ac1, ac2)
|
chat = acfactory.get_accepted_chat(ac1, ac2)
|
||||||
|
|
||||||
basename = "somedäüta.html.zip"
|
basename = "somedäüta.html.zip"
|
||||||
p = os.path.join(tmpdir.strpath, basename)
|
p = tmp_path / basename
|
||||||
with open(p, "w") as f:
|
p.write_text("some data")
|
||||||
f.write("some data")
|
|
||||||
|
|
||||||
def send_and_receive_message():
|
def send_and_receive_message():
|
||||||
lp.sec("ac1: prepare and send attachment + text to ac2")
|
lp.sec("ac1: prepare and send attachment + text to ac2")
|
||||||
msg1 = Message.new_empty(ac1, "file")
|
msg1 = Message.new_empty(ac1, "file")
|
||||||
msg1.set_text("withfile")
|
msg1.set_text("withfile")
|
||||||
msg1.set_file(p)
|
msg1.set_file(str(p))
|
||||||
chat.send_msg(msg1)
|
chat.send_msg(msg1)
|
||||||
|
|
||||||
lp.sec("ac2: receive message")
|
lp.sec("ac2: receive message")
|
||||||
@@ -189,21 +190,20 @@ def test_send_file_twice_unicode_filename_mangling(tmpdir, acfactory, lp):
|
|||||||
assert msg.filename != msg2.filename
|
assert msg.filename != msg2.filename
|
||||||
|
|
||||||
|
|
||||||
def test_send_file_html_attachment(tmpdir, acfactory, lp):
|
def test_send_file_html_attachment(tmp_path, acfactory, lp):
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
chat = acfactory.get_accepted_chat(ac1, ac2)
|
chat = acfactory.get_accepted_chat(ac1, ac2)
|
||||||
|
|
||||||
basename = "test.html"
|
basename = "test.html"
|
||||||
content = "<html><body>text</body>data"
|
content = "<html><body>text</body>data"
|
||||||
|
|
||||||
p = os.path.join(tmpdir.strpath, basename)
|
p = tmp_path / basename
|
||||||
with open(p, "w") as f:
|
# write wrong html to see if core tries to parse it
|
||||||
# write wrong html to see if core tries to parse it
|
# (it shouldn't as it's a file attachment)
|
||||||
# (it shouldn't as it's a file attachment)
|
p.write_text(content)
|
||||||
f.write(content)
|
|
||||||
|
|
||||||
lp.sec("ac1: prepare and send attachment + text to ac2")
|
lp.sec("ac1: prepare and send attachment + text to ac2")
|
||||||
chat.send_file(p, mime_type="text/html")
|
chat.send_file(str(p), mime_type="text/html")
|
||||||
|
|
||||||
lp.sec("ac2: receive message")
|
lp.sec("ac2: receive message")
|
||||||
ev = ac2._evtracker.get_matching("DC_EVENT_INCOMING_MSG")
|
ev = ac2._evtracker.get_matching("DC_EVENT_INCOMING_MSG")
|
||||||
@@ -1207,7 +1207,7 @@ def test_quote_encrypted(acfactory, lp):
|
|||||||
assert msg_in.is_encrypted() == quoted_msg.is_encrypted()
|
assert msg_in.is_encrypted() == quoted_msg.is_encrypted()
|
||||||
|
|
||||||
|
|
||||||
def test_quote_attachment(tmpdir, acfactory, lp):
|
def test_quote_attachment(tmp_path, acfactory, lp):
|
||||||
"""Test that replies with an attachment and a quote are received correctly."""
|
"""Test that replies with an attachment and a quote are received correctly."""
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
|
|
||||||
@@ -1222,15 +1222,14 @@ def test_quote_attachment(tmpdir, acfactory, lp):
|
|||||||
assert received_message.text == "hi"
|
assert received_message.text == "hi"
|
||||||
|
|
||||||
basename = "attachment.txt"
|
basename = "attachment.txt"
|
||||||
p = os.path.join(tmpdir.strpath, basename)
|
p = tmp_path / basename
|
||||||
with open(p, "w") as f:
|
p.write_text("data to send")
|
||||||
f.write("data to send")
|
|
||||||
|
|
||||||
lp.sec("ac2 sends a reply to ac1")
|
lp.sec("ac2 sends a reply to ac1")
|
||||||
chat2 = received_message.create_chat()
|
chat2 = received_message.create_chat()
|
||||||
reply = Message.new_empty(ac2, "file")
|
reply = Message.new_empty(ac2, "file")
|
||||||
reply.set_text("message reply")
|
reply.set_text("message reply")
|
||||||
reply.set_file(p)
|
reply.set_file(str(p))
|
||||||
reply.quote = received_message
|
reply.quote = received_message
|
||||||
chat2.send_msg(reply)
|
chat2.send_msg(reply)
|
||||||
|
|
||||||
@@ -1337,7 +1336,7 @@ def test_send_and_receive_image(acfactory, lp, data):
|
|||||||
assert m == msg_in
|
assert m == msg_in
|
||||||
|
|
||||||
|
|
||||||
def test_reaction_to_partially_fetched_msg(acfactory, lp, tmpdir):
|
def test_reaction_to_partially_fetched_msg(acfactory, lp, tmp_path):
|
||||||
"""See https://github.com/deltachat/deltachat-core-rust/issues/3688 "Partially downloaded
|
"""See https://github.com/deltachat/deltachat-core-rust/issues/3688 "Partially downloaded
|
||||||
messages are received out of order".
|
messages are received out of order".
|
||||||
|
|
||||||
@@ -1372,10 +1371,9 @@ def test_reaction_to_partially_fetched_msg(acfactory, lp, tmpdir):
|
|||||||
lp.sec("sending small+large messages from ac1 to ac2")
|
lp.sec("sending small+large messages from ac1 to ac2")
|
||||||
msgs = []
|
msgs = []
|
||||||
msgs.append(chat.send_text("hi"))
|
msgs.append(chat.send_text("hi"))
|
||||||
path = tmpdir.join("large")
|
path = tmp_path / "large"
|
||||||
with open(path, "wb") as fout:
|
path.write_bytes(os.urandom(download_limit + 1))
|
||||||
fout.write(os.urandom(download_limit + 1))
|
msgs.append(chat.send_file(str(path)))
|
||||||
msgs.append(chat.send_file(path.strpath))
|
|
||||||
|
|
||||||
lp.sec("sending a reaction to the large message from ac1 to ac2")
|
lp.sec("sending a reaction to the large message from ac1 to ac2")
|
||||||
react_str = "\N{THUMBS UP SIGN}"
|
react_str = "\N{THUMBS UP SIGN}"
|
||||||
@@ -1434,7 +1432,7 @@ def test_reactions_for_a_reordering_move(acfactory, lp):
|
|||||||
assert reactions.get_by_contact(contacts[0]) == react_str
|
assert reactions.get_by_contact(contacts[0]) == react_str
|
||||||
|
|
||||||
|
|
||||||
def test_import_export_online_all(acfactory, tmpdir, data, lp):
|
def test_import_export_online_all(acfactory, tmp_path, data, lp):
|
||||||
(ac1,) = acfactory.get_online_accounts(1)
|
(ac1,) = acfactory.get_online_accounts(1)
|
||||||
|
|
||||||
lp.sec("create some chat content")
|
lp.sec("create some chat content")
|
||||||
@@ -1446,10 +1444,10 @@ def test_import_export_online_all(acfactory, tmpdir, data, lp):
|
|||||||
chat1.send_image(original_image_path)
|
chat1.send_image(original_image_path)
|
||||||
|
|
||||||
# Add another 100KB file that ensures that the progress is smooth enough
|
# Add another 100KB file that ensures that the progress is smooth enough
|
||||||
path = tmpdir.join("attachment.txt")
|
path = tmp_path / "attachment.txt"
|
||||||
with open(path, "w") as file:
|
with path.open("w") as file:
|
||||||
file.truncate(100000)
|
file.truncate(100000)
|
||||||
chat1.send_file(path.strpath)
|
chat1.send_file(str(path))
|
||||||
|
|
||||||
def assert_account_is_proper(ac):
|
def assert_account_is_proper(ac):
|
||||||
contacts = ac.get_contacts(query="some1")
|
contacts = ac.get_contacts(query="some1")
|
||||||
@@ -1467,12 +1465,13 @@ def test_import_export_online_all(acfactory, tmpdir, data, lp):
|
|||||||
|
|
||||||
assert_account_is_proper(ac1)
|
assert_account_is_proper(ac1)
|
||||||
|
|
||||||
backupdir = tmpdir.mkdir("backup")
|
backupdir = tmp_path / "backup"
|
||||||
|
backupdir.mkdir()
|
||||||
|
|
||||||
lp.sec(f"export all to {backupdir}")
|
lp.sec(f"export all to {backupdir}")
|
||||||
with ac1.temp_plugin(ImexTracker()) as imex_tracker:
|
with ac1.temp_plugin(ImexTracker()) as imex_tracker:
|
||||||
ac1.stop_io()
|
ac1.stop_io()
|
||||||
ac1.imex(backupdir.strpath, const.DC_IMEX_EXPORT_BACKUP)
|
ac1.imex(str(backupdir), const.DC_IMEX_EXPORT_BACKUP)
|
||||||
|
|
||||||
# check progress events for export
|
# check progress events for export
|
||||||
assert imex_tracker.wait_progress(1, progress_upper_limit=249)
|
assert imex_tracker.wait_progress(1, progress_upper_limit=249)
|
||||||
@@ -1490,7 +1489,7 @@ def test_import_export_online_all(acfactory, tmpdir, data, lp):
|
|||||||
ac2 = acfactory.get_unconfigured_account()
|
ac2 = acfactory.get_unconfigured_account()
|
||||||
|
|
||||||
lp.sec("get latest backup file")
|
lp.sec("get latest backup file")
|
||||||
path2 = ac2.get_latest_backupfile(backupdir.strpath)
|
path2 = ac2.get_latest_backupfile(str(backupdir))
|
||||||
assert path2 == path
|
assert path2 == path
|
||||||
|
|
||||||
lp.sec("import backup and check it's proper")
|
lp.sec("import backup and check it's proper")
|
||||||
@@ -1508,10 +1507,10 @@ def test_import_export_online_all(acfactory, tmpdir, data, lp):
|
|||||||
|
|
||||||
lp.sec(f"Second-time export all to {backupdir}")
|
lp.sec(f"Second-time export all to {backupdir}")
|
||||||
ac1.stop_io()
|
ac1.stop_io()
|
||||||
path2 = ac1.export_all(backupdir.strpath)
|
path2 = ac1.export_all(str(backupdir))
|
||||||
assert os.path.exists(path2)
|
assert os.path.exists(path2)
|
||||||
assert path2 != path
|
assert path2 != path
|
||||||
assert ac2.get_latest_backupfile(backupdir.strpath) == path2
|
assert ac2.get_latest_backupfile(str(backupdir)) == path2
|
||||||
|
|
||||||
|
|
||||||
def test_ac_setup_message(acfactory, lp):
|
def test_ac_setup_message(acfactory, lp):
|
||||||
|
|||||||
@@ -30,25 +30,26 @@ def wait_msgs_changed(account, msgs_list):
|
|||||||
|
|
||||||
|
|
||||||
class TestOnlineInCreation:
|
class TestOnlineInCreation:
|
||||||
def test_increation_not_blobdir(self, tmpdir, acfactory, lp):
|
def test_increation_not_blobdir(self, tmp_path, acfactory, lp):
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
chat = ac1.create_chat(ac2)
|
chat = ac1.create_chat(ac2)
|
||||||
|
|
||||||
lp.sec("Creating in-creation file outside of blobdir")
|
lp.sec("Creating in-creation file outside of blobdir")
|
||||||
assert tmpdir.strpath != ac1.get_blobdir()
|
assert str(tmp_path) != ac1.get_blobdir()
|
||||||
src = tmpdir.join("file.txt").ensure(file=1)
|
src = tmp_path / "file.txt"
|
||||||
|
src.touch()
|
||||||
with pytest.raises(Exception):
|
with pytest.raises(Exception):
|
||||||
chat.prepare_message_file(src.strpath)
|
chat.prepare_message_file(str(src))
|
||||||
|
|
||||||
def test_no_increation_copies_to_blobdir(self, tmpdir, acfactory, lp):
|
def test_no_increation_copies_to_blobdir(self, tmp_path, acfactory, lp):
|
||||||
ac1, ac2 = acfactory.get_online_accounts(2)
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
||||||
chat = ac1.create_chat(ac2)
|
chat = ac1.create_chat(ac2)
|
||||||
|
|
||||||
lp.sec("Creating file outside of blobdir")
|
lp.sec("Creating file outside of blobdir")
|
||||||
assert tmpdir.strpath != ac1.get_blobdir()
|
assert str(tmp_path) != ac1.get_blobdir()
|
||||||
src = tmpdir.join("file.txt")
|
src = tmp_path / "file.txt"
|
||||||
src.write("hello there\n")
|
src.write_text("hello there\n")
|
||||||
chat.send_file(src.strpath)
|
chat.send_file(str(src))
|
||||||
|
|
||||||
blob_src = os.path.join(ac1.get_blobdir(), "file.txt")
|
blob_src = os.path.join(ac1.get_blobdir(), "file.txt")
|
||||||
assert os.path.exists(blob_src), "file.txt not copied to blobdir"
|
assert os.path.exists(blob_src), "file.txt not copied to blobdir"
|
||||||
|
|||||||
@@ -52,18 +52,18 @@ def test_parse_system_add_remove(msgtext, res):
|
|||||||
|
|
||||||
|
|
||||||
class TestOfflineAccountBasic:
|
class TestOfflineAccountBasic:
|
||||||
def test_wrong_db(self, tmpdir):
|
def test_wrong_db(self, tmp_path):
|
||||||
p = tmpdir.join("hello.db")
|
p = tmp_path / "hello.db"
|
||||||
p.write("123")
|
p.write_text("123")
|
||||||
account = Account(p.strpath)
|
account = Account(str(p))
|
||||||
assert not account.is_open()
|
assert not account.is_open()
|
||||||
|
|
||||||
def test_os_name(self, tmpdir):
|
def test_os_name(self, tmp_path):
|
||||||
p = tmpdir.join("hello.db")
|
p = tmp_path / "hello.db"
|
||||||
# we can't easily test if os_name is used in X-Mailer
|
# we can't easily test if os_name is used in X-Mailer
|
||||||
# outgoing messages without a full Online test
|
# outgoing messages without a full Online test
|
||||||
# but we at least check Account accepts the arg
|
# but we at least check Account accepts the arg
|
||||||
ac1 = Account(p.strpath, os_name="solarpunk")
|
ac1 = Account(str(p), os_name="solarpunk")
|
||||||
ac1.get_info()
|
ac1.get_info()
|
||||||
|
|
||||||
def test_preconfigure_keypair(self, acfactory, data):
|
def test_preconfigure_keypair(self, acfactory, data):
|
||||||
@@ -496,22 +496,22 @@ class TestOfflineChat:
|
|||||||
contact = msg.get_sender_contact()
|
contact = msg.get_sender_contact()
|
||||||
assert contact == ac1.get_self_contact()
|
assert contact == ac1.get_self_contact()
|
||||||
|
|
||||||
def test_import_export_on_unencrypted_acct(self, acfactory, tmpdir):
|
def test_import_export_on_unencrypted_acct(self, acfactory, tmp_path):
|
||||||
backupdir = tmpdir.mkdir("backup")
|
backupdir = tmp_path / "backup"
|
||||||
|
backupdir.mkdir()
|
||||||
ac1 = acfactory.get_pseudo_configured_account()
|
ac1 = acfactory.get_pseudo_configured_account()
|
||||||
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
||||||
# send a text message
|
# send a text message
|
||||||
msg = chat.send_text("msg1")
|
msg = chat.send_text("msg1")
|
||||||
# send a binary file
|
# send a binary file
|
||||||
bin = tmpdir.join("some.bin")
|
bin = tmp_path / "some.bin"
|
||||||
with bin.open("w") as f:
|
bin.write_bytes(b"\00123" * 10000)
|
||||||
f.write("\00123" * 10000)
|
msg = chat.send_file(str(bin))
|
||||||
msg = chat.send_file(bin.strpath)
|
|
||||||
contact = msg.get_sender_contact()
|
contact = msg.get_sender_contact()
|
||||||
assert contact == ac1.get_self_contact()
|
assert contact == ac1.get_self_contact()
|
||||||
assert not backupdir.listdir()
|
assert not list(backupdir.iterdir())
|
||||||
ac1.stop_io()
|
ac1.stop_io()
|
||||||
path = ac1.export_all(backupdir.strpath)
|
path = ac1.export_all(str(backupdir))
|
||||||
assert os.path.exists(path)
|
assert os.path.exists(path)
|
||||||
ac2 = acfactory.get_unconfigured_account()
|
ac2 = acfactory.get_unconfigured_account()
|
||||||
ac2.import_all(path)
|
ac2.import_all(path)
|
||||||
@@ -525,27 +525,27 @@ class TestOfflineChat:
|
|||||||
assert messages[0].text == "msg1"
|
assert messages[0].text == "msg1"
|
||||||
assert os.path.exists(messages[1].filename)
|
assert os.path.exists(messages[1].filename)
|
||||||
|
|
||||||
def test_import_export_on_encrypted_acct(self, acfactory, tmpdir):
|
def test_import_export_on_encrypted_acct(self, acfactory, tmp_path):
|
||||||
passphrase1 = "passphrase1"
|
passphrase1 = "passphrase1"
|
||||||
passphrase2 = "passphrase2"
|
passphrase2 = "passphrase2"
|
||||||
backupdir = tmpdir.mkdir("backup")
|
backupdir = tmp_path / "backup"
|
||||||
|
backupdir.mkdir()
|
||||||
ac1 = acfactory.get_pseudo_configured_account(passphrase=passphrase1)
|
ac1 = acfactory.get_pseudo_configured_account(passphrase=passphrase1)
|
||||||
|
|
||||||
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
||||||
# send a text message
|
# send a text message
|
||||||
msg = chat.send_text("msg1")
|
msg = chat.send_text("msg1")
|
||||||
# send a binary file
|
# send a binary file
|
||||||
bin = tmpdir.join("some.bin")
|
bin = tmp_path / "some.bin"
|
||||||
with bin.open("w") as f:
|
bin.write_bytes(b"\00123" * 10000)
|
||||||
f.write("\00123" * 10000)
|
msg = chat.send_file(str(bin))
|
||||||
msg = chat.send_file(bin.strpath)
|
|
||||||
contact = msg.get_sender_contact()
|
contact = msg.get_sender_contact()
|
||||||
assert contact == ac1.get_self_contact()
|
assert contact == ac1.get_self_contact()
|
||||||
|
|
||||||
assert not backupdir.listdir()
|
assert not list(backupdir.iterdir())
|
||||||
ac1.stop_io()
|
ac1.stop_io()
|
||||||
|
|
||||||
path = ac1.export_all(backupdir.strpath)
|
path = ac1.export_all(str(backupdir))
|
||||||
assert os.path.exists(path)
|
assert os.path.exists(path)
|
||||||
|
|
||||||
ac2 = acfactory.get_unconfigured_account(closed=True)
|
ac2 = acfactory.get_unconfigured_account(closed=True)
|
||||||
@@ -580,27 +580,27 @@ class TestOfflineChat:
|
|||||||
assert messages[0].text == "msg1"
|
assert messages[0].text == "msg1"
|
||||||
assert os.path.exists(messages[1].filename)
|
assert os.path.exists(messages[1].filename)
|
||||||
|
|
||||||
def test_import_export_with_passphrase(self, acfactory, tmpdir):
|
def test_import_export_with_passphrase(self, acfactory, tmp_path):
|
||||||
passphrase = "test_passphrase"
|
passphrase = "test_passphrase"
|
||||||
wrong_passphrase = "wrong_passprase"
|
wrong_passphrase = "wrong_passprase"
|
||||||
backupdir = tmpdir.mkdir("backup")
|
backupdir = tmp_path / "backup"
|
||||||
|
backupdir.mkdir()
|
||||||
ac1 = acfactory.get_pseudo_configured_account()
|
ac1 = acfactory.get_pseudo_configured_account()
|
||||||
|
|
||||||
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
||||||
# send a text message
|
# send a text message
|
||||||
msg = chat.send_text("msg1")
|
msg = chat.send_text("msg1")
|
||||||
# send a binary file
|
# send a binary file
|
||||||
bin = tmpdir.join("some.bin")
|
bin = tmp_path / "some.bin"
|
||||||
with bin.open("w") as f:
|
bin.write_bytes(b"\00123" * 10000)
|
||||||
f.write("\00123" * 10000)
|
msg = chat.send_file(str(bin))
|
||||||
msg = chat.send_file(bin.strpath)
|
|
||||||
contact = msg.get_sender_contact()
|
contact = msg.get_sender_contact()
|
||||||
assert contact == ac1.get_self_contact()
|
assert contact == ac1.get_self_contact()
|
||||||
|
|
||||||
assert not backupdir.listdir()
|
assert not list(backupdir.iterdir())
|
||||||
ac1.stop_io()
|
ac1.stop_io()
|
||||||
|
|
||||||
path = ac1.export_all(backupdir.strpath, passphrase)
|
path = ac1.export_all(str(backupdir), passphrase)
|
||||||
assert os.path.exists(path)
|
assert os.path.exists(path)
|
||||||
|
|
||||||
ac2 = acfactory.get_unconfigured_account()
|
ac2 = acfactory.get_unconfigured_account()
|
||||||
@@ -619,7 +619,7 @@ class TestOfflineChat:
|
|||||||
assert messages[0].text == "msg1"
|
assert messages[0].text == "msg1"
|
||||||
assert os.path.exists(messages[1].filename)
|
assert os.path.exists(messages[1].filename)
|
||||||
|
|
||||||
def test_import_encrypted_bak_into_encrypted_acct(self, acfactory, tmpdir):
|
def test_import_encrypted_bak_into_encrypted_acct(self, acfactory, tmp_path):
|
||||||
"""
|
"""
|
||||||
Test that account passphrase isn't lost if backup failed to be imported.
|
Test that account passphrase isn't lost if backup failed to be imported.
|
||||||
See https://github.com/deltachat/deltachat-core-rust/issues/3379
|
See https://github.com/deltachat/deltachat-core-rust/issues/3379
|
||||||
@@ -627,24 +627,24 @@ class TestOfflineChat:
|
|||||||
acct_passphrase = "passphrase1"
|
acct_passphrase = "passphrase1"
|
||||||
bak_passphrase = "passphrase2"
|
bak_passphrase = "passphrase2"
|
||||||
wrong_passphrase = "wrong_passprase"
|
wrong_passphrase = "wrong_passprase"
|
||||||
backupdir = tmpdir.mkdir("backup")
|
backupdir = tmp_path / "backup"
|
||||||
|
backupdir.mkdir()
|
||||||
|
|
||||||
ac1 = acfactory.get_pseudo_configured_account()
|
ac1 = acfactory.get_pseudo_configured_account()
|
||||||
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
chat = ac1.create_contact("some1 <some1@example.org>").create_chat()
|
||||||
# send a text message
|
# send a text message
|
||||||
msg = chat.send_text("msg1")
|
msg = chat.send_text("msg1")
|
||||||
# send a binary file
|
# send a binary file
|
||||||
bin = tmpdir.join("some.bin")
|
bin = tmp_path / "some.bin"
|
||||||
with bin.open("w") as f:
|
bin.write_bytes(b"\00123" * 10000)
|
||||||
f.write("\00123" * 10000)
|
msg = chat.send_file(str(bin))
|
||||||
msg = chat.send_file(bin.strpath)
|
|
||||||
contact = msg.get_sender_contact()
|
contact = msg.get_sender_contact()
|
||||||
assert contact == ac1.get_self_contact()
|
assert contact == ac1.get_self_contact()
|
||||||
|
|
||||||
assert not backupdir.listdir()
|
assert not list(backupdir.iterdir())
|
||||||
ac1.stop_io()
|
ac1.stop_io()
|
||||||
|
|
||||||
path = ac1.export_all(backupdir.strpath, bak_passphrase)
|
path = ac1.export_all(str(backupdir), bak_passphrase)
|
||||||
assert os.path.exists(path)
|
assert os.path.exists(path)
|
||||||
|
|
||||||
ac2 = acfactory.get_unconfigured_account(closed=True)
|
ac2 = acfactory.get_unconfigured_account(closed=True)
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import os
|
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
|
||||||
from deltachat import capi, const, cutil, register_global_plugin
|
from deltachat import capi, const, cutil, register_global_plugin
|
||||||
@@ -65,16 +64,17 @@ class TestACSetup:
|
|||||||
assert pc._account2state[ac1] == pc.IDLEREADY
|
assert pc._account2state[ac1] == pc.IDLEREADY
|
||||||
assert pc._account2state[ac2] == pc.IDLEREADY
|
assert pc._account2state[ac2] == pc.IDLEREADY
|
||||||
|
|
||||||
def test_store_and_retrieve_configured_account_cache(self, acfactory, tmpdir):
|
def test_store_and_retrieve_configured_account_cache(self, acfactory, tmp_path):
|
||||||
ac1 = acfactory.get_pseudo_configured_account()
|
ac1 = acfactory.get_pseudo_configured_account()
|
||||||
holder = acfactory._acsetup.testprocess
|
holder = acfactory._acsetup.testprocess
|
||||||
assert holder.cache_maybe_store_configured_db_files(ac1)
|
assert holder.cache_maybe_store_configured_db_files(ac1)
|
||||||
assert not holder.cache_maybe_store_configured_db_files(ac1)
|
assert not holder.cache_maybe_store_configured_db_files(ac1)
|
||||||
acdir = tmpdir.mkdir("newaccount")
|
acdir = tmp_path / "newaccount"
|
||||||
|
acdir.mkdir()
|
||||||
addr = ac1.get_config("addr")
|
addr = ac1.get_config("addr")
|
||||||
target_db_path = acdir.join("db").strpath
|
target_db_path = acdir / "db"
|
||||||
assert holder.cache_maybe_retrieve_configured_db_files(addr, target_db_path)
|
assert holder.cache_maybe_retrieve_configured_db_files(addr, str(target_db_path))
|
||||||
assert len(os.listdir(acdir)) >= 2
|
assert sum(1 for _ in acdir.iterdir()) >= 2
|
||||||
|
|
||||||
|
|
||||||
def test_liveconfig_caching(acfactory, monkeypatch):
|
def test_liveconfig_caching(acfactory, monkeypatch):
|
||||||
@@ -112,20 +112,20 @@ def test_dc_close_events(acfactory):
|
|||||||
shutdowns.get(timeout=2)
|
shutdowns.get(timeout=2)
|
||||||
|
|
||||||
|
|
||||||
def test_wrong_db(tmpdir):
|
def test_wrong_db(tmp_path):
|
||||||
p = tmpdir.join("hello.db")
|
p = tmp_path / "hello.db"
|
||||||
# write an invalid database file
|
# write an invalid database file
|
||||||
p.write("x123" * 10)
|
p.write_bytes(b"x123" * 10)
|
||||||
|
|
||||||
context = lib.dc_context_new(ffi.NULL, p.strpath.encode("ascii"), ffi.NULL)
|
context = lib.dc_context_new(ffi.NULL, str(p).encode("ascii"), ffi.NULL)
|
||||||
assert not lib.dc_context_is_open(context)
|
assert not lib.dc_context_is_open(context)
|
||||||
|
|
||||||
|
|
||||||
def test_empty_blobdir(tmpdir):
|
def test_empty_blobdir(tmp_path):
|
||||||
db_fname = tmpdir.join("hello.db")
|
db_fname = tmp_path / "hello.db"
|
||||||
# Apparently some client code expects this to be the same as passing NULL.
|
# Apparently some client code expects this to be the same as passing NULL.
|
||||||
ctx = ffi.gc(
|
ctx = ffi.gc(
|
||||||
lib.dc_context_new(ffi.NULL, db_fname.strpath.encode("ascii"), b""),
|
lib.dc_context_new(ffi.NULL, str(db_fname).encode("ascii"), b""),
|
||||||
lib.dc_context_unref,
|
lib.dc_context_unref,
|
||||||
)
|
)
|
||||||
assert ctx != ffi.NULL
|
assert ctx != ffi.NULL
|
||||||
@@ -174,10 +174,10 @@ def test_provider_info_none():
|
|||||||
assert lib.dc_provider_new_from_email(ctx, cutil.as_dc_charpointer("email@unexistent.no")) == ffi.NULL
|
assert lib.dc_provider_new_from_email(ctx, cutil.as_dc_charpointer("email@unexistent.no")) == ffi.NULL
|
||||||
|
|
||||||
|
|
||||||
def test_get_info_open(tmpdir):
|
def test_get_info_open(tmp_path):
|
||||||
db_fname = tmpdir.join("test.db")
|
db_fname = tmp_path / "test.db"
|
||||||
ctx = ffi.gc(
|
ctx = ffi.gc(
|
||||||
lib.dc_context_new(ffi.NULL, db_fname.strpath.encode("ascii"), ffi.NULL),
|
lib.dc_context_new(ffi.NULL, str(db_fname).encode("ascii"), ffi.NULL),
|
||||||
lib.dc_context_unref,
|
lib.dc_context_unref,
|
||||||
)
|
)
|
||||||
info = cutil.from_dc_charpointer(lib.dc_get_info(ctx))
|
info = cutil.from_dc_charpointer(lib.dc_get_info(ctx))
|
||||||
@@ -218,10 +218,10 @@ def test_logged_ac_process_ffi_failure(acfactory):
|
|||||||
assert "Traceback" in res
|
assert "Traceback" in res
|
||||||
|
|
||||||
|
|
||||||
def test_jsonrpc_blocking_call(tmpdir):
|
def test_jsonrpc_blocking_call(tmp_path):
|
||||||
accounts_fname = tmpdir.join("accounts")
|
accounts_fname = tmp_path / "accounts"
|
||||||
accounts = ffi.gc(
|
accounts = ffi.gc(
|
||||||
lib.dc_accounts_new(ffi.NULL, accounts_fname.strpath.encode("ascii")),
|
lib.dc_accounts_new(ffi.NULL, str(accounts_fname).encode("ascii")),
|
||||||
lib.dc_accounts_unref,
|
lib.dc_accounts_unref,
|
||||||
)
|
)
|
||||||
jsonrpc = ffi.gc(lib.dc_jsonrpc_init(accounts), lib.dc_jsonrpc_unref)
|
jsonrpc = ffi.gc(lib.dc_jsonrpc_init(accounts), lib.dc_jsonrpc_unref)
|
||||||
|
|||||||
Reference in New Issue
Block a user