mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
330 lines
11 KiB
Python
330 lines
11 KiB
Python
import pytest
|
|
|
|
from deltachat_rpc_client import EventType
|
|
from deltachat_rpc_client.const import DownloadState
|
|
from deltachat_rpc_client.rpc import JsonRpcError
|
|
|
|
|
|
def test_add_second_address(acfactory) -> None:
|
|
account = acfactory.new_configured_account()
|
|
assert len(account.list_transports()) == 1
|
|
|
|
assert account.get_config("show_emails") == "2"
|
|
|
|
qr = acfactory.get_account_qr()
|
|
account.add_transport_from_qr(qr)
|
|
assert len(account.list_transports()) == 2
|
|
|
|
account.add_transport_from_qr(qr)
|
|
assert len(account.list_transports()) == 3
|
|
|
|
first_addr = account.list_transports()[0]["addr"]
|
|
second_addr = account.list_transports()[1]["addr"]
|
|
|
|
# Cannot delete the first address.
|
|
with pytest.raises(JsonRpcError):
|
|
account.delete_transport(first_addr)
|
|
|
|
account.delete_transport(second_addr)
|
|
assert len(account.list_transports()) == 2
|
|
|
|
# show_emails does not matter for multi-relay, can be set to anything
|
|
account.set_config("show_emails", "0")
|
|
|
|
|
|
def test_second_transport_without_classic_emails(acfactory) -> None:
|
|
"""Test that second transport can be configured if classic emails are not fetched."""
|
|
account = acfactory.new_configured_account()
|
|
assert len(account.list_transports()) == 1
|
|
|
|
assert account.get_config("show_emails") == "2"
|
|
|
|
qr = acfactory.get_account_qr()
|
|
account.set_config("show_emails", "0")
|
|
|
|
account.add_transport_from_qr(qr)
|
|
|
|
|
|
def test_change_address(acfactory) -> None:
|
|
"""Test Alice configuring a second transport and setting it as a primary one."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob_addr = bob.get_config("configured_addr")
|
|
bob.create_chat(alice)
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
msg1 = bob.wait_for_incoming_msg().get_snapshot()
|
|
sender_addr1 = msg1.sender.get_snapshot().address
|
|
|
|
alice.stop_io()
|
|
old_alice_addr = alice.get_config("configured_addr")
|
|
alice_vcard = alice.self_contact.make_vcard()
|
|
assert old_alice_addr in alice_vcard
|
|
qr = acfactory.get_account_qr()
|
|
alice.add_transport_from_qr(qr)
|
|
new_alice_addr = alice.list_transports()[1]["addr"]
|
|
with pytest.raises(JsonRpcError):
|
|
# Cannot use the address that is not
|
|
# configured for any transport.
|
|
alice.set_config("configured_addr", bob_addr)
|
|
|
|
# Load old address so it is cached.
|
|
assert alice.get_config("configured_addr") == old_alice_addr
|
|
alice.set_config("configured_addr", new_alice_addr)
|
|
# Make sure that setting `configured_addr` invalidated the cache.
|
|
assert alice.get_config("configured_addr") == new_alice_addr
|
|
|
|
alice_vcard = alice.self_contact.make_vcard()
|
|
assert old_alice_addr not in alice_vcard
|
|
assert new_alice_addr in alice_vcard
|
|
with pytest.raises(JsonRpcError):
|
|
alice.delete_transport(new_alice_addr)
|
|
alice.start_io()
|
|
|
|
alice_chat_bob.send_text("Hello again!")
|
|
|
|
msg2 = bob.wait_for_incoming_msg().get_snapshot()
|
|
sender_addr2 = msg2.sender.get_snapshot().address
|
|
|
|
assert msg1.sender == msg2.sender
|
|
assert sender_addr1 != sender_addr2
|
|
assert sender_addr1 == old_alice_addr
|
|
assert sender_addr2 == new_alice_addr
|
|
|
|
|
|
def test_download_on_demand(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
alice.set_config("download_limit", "1")
|
|
|
|
alice.stop_io()
|
|
qr = acfactory.get_account_qr()
|
|
alice.add_transport_from_qr(qr)
|
|
alice.start_io()
|
|
|
|
alice.create_chat(bob)
|
|
chat_bob_alice = bob.create_chat(alice)
|
|
chat_bob_alice.send_message(file="../test-data/image/screenshot.jpg")
|
|
msg = alice.wait_for_incoming_msg()
|
|
snapshot = msg.get_snapshot()
|
|
assert snapshot.download_state == DownloadState.AVAILABLE
|
|
chat_id = snapshot.chat_id
|
|
# Actually the message isn't available yet. Wait somehow for the post-message to arrive.
|
|
chat_bob_alice.send_message("Now you can download my previous message")
|
|
alice.wait_for_incoming_msg()
|
|
alice._rpc.download_full_message(alice.id, msg.id)
|
|
for dstate in [DownloadState.IN_PROGRESS, DownloadState.DONE]:
|
|
event = alice.wait_for_event(EventType.MSGS_CHANGED)
|
|
assert event.chat_id == chat_id
|
|
assert event.msg_id == msg.id
|
|
assert msg.get_snapshot().download_state == dstate
|
|
|
|
|
|
def test_reconfigure_transport(acfactory) -> None:
|
|
"""Test that reconfiguring the transport works."""
|
|
account = acfactory.get_online_account()
|
|
|
|
[transport] = account.list_transports()
|
|
account.add_or_update_transport(transport)
|
|
|
|
|
|
def test_transport_synchronization(acfactory, log) -> None:
|
|
"""Test synchronization of transports between devices."""
|
|
|
|
def wait_for_io_started(ac):
|
|
while True:
|
|
ev = ac.wait_for_event(EventType.INFO)
|
|
if "scheduler is running" in ev.msg:
|
|
return
|
|
|
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
|
ac1_clone = ac1.clone()
|
|
ac1_clone.bring_online()
|
|
|
|
qr = acfactory.get_account_qr()
|
|
|
|
ac1.add_transport_from_qr(qr)
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1_clone)
|
|
assert len(ac1.list_transports()) == 2
|
|
assert len(ac1_clone.list_transports()) == 2
|
|
|
|
ac1_clone.add_transport_from_qr(qr)
|
|
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1)
|
|
assert len(ac1.list_transports()) == 3
|
|
assert len(ac1_clone.list_transports()) == 3
|
|
|
|
log.section("ac1 clone removes second transport")
|
|
[transport1, transport2, transport3] = ac1_clone.list_transports()
|
|
addr3 = transport3["addr"]
|
|
ac1_clone.delete_transport(transport2["addr"])
|
|
|
|
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1)
|
|
[transport1, transport3] = ac1.list_transports()
|
|
|
|
log.section("ac1 changes the primary transport")
|
|
ac1.set_config("configured_addr", transport3["addr"])
|
|
|
|
# One event for updated `add_timestamp` of the new primary transport,
|
|
# one event for the `configured_addr` update.
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
[transport1, transport3] = ac1_clone.list_transports()
|
|
assert ac1_clone.get_config("configured_addr") == addr3
|
|
|
|
log.section("ac1 removes the first transport")
|
|
ac1.delete_transport(transport1["addr"])
|
|
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1_clone)
|
|
[transport3] = ac1_clone.list_transports()
|
|
assert transport3["addr"] == addr3
|
|
assert ac1_clone.get_config("configured_addr") == addr3
|
|
|
|
ac2_chat = ac2.create_chat(ac1)
|
|
ac2_chat.send_text("Hello!")
|
|
|
|
assert ac1.wait_for_incoming_msg().get_snapshot().text == "Hello!"
|
|
assert ac1_clone.wait_for_incoming_msg().get_snapshot().text == "Hello!"
|
|
|
|
|
|
def test_transport_sync_new_as_primary(acfactory, log) -> None:
|
|
"""Test synchronization of new transport as primary between devices."""
|
|
ac1, bob = acfactory.get_online_accounts(2)
|
|
ac1_clone = ac1.clone()
|
|
ac1_clone.bring_online()
|
|
|
|
qr = acfactory.get_account_qr()
|
|
|
|
ac1.add_transport_from_qr(qr)
|
|
ac1_transports = ac1.list_transports()
|
|
assert len(ac1_transports) == 2
|
|
[transport1, transport2] = ac1_transports
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
assert len(ac1_clone.list_transports()) == 2
|
|
assert ac1_clone.get_config("configured_addr") == transport1["addr"]
|
|
|
|
log.section("ac1 changes the primary transport")
|
|
ac1.set_config("configured_addr", transport2["addr"])
|
|
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
assert ac1_clone.get_config("configured_addr") == transport2["addr"]
|
|
|
|
log.section("ac1_clone receives a message via the new primary transport")
|
|
ac1_chat = ac1.create_chat(bob)
|
|
ac1_chat.send_text("Hello!")
|
|
bob_chat_id = bob.wait_for_incoming_msg_event().chat_id
|
|
bob_chat = bob.get_chat_by_id(bob_chat_id)
|
|
bob_chat.accept()
|
|
bob_chat.send_text("hello back")
|
|
assert ac1_clone.wait_for_incoming_msg().get_snapshot().text == "hello back"
|
|
|
|
|
|
def test_recognize_self_address(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob_chat = bob.create_chat(alice)
|
|
|
|
qr = acfactory.get_account_qr()
|
|
alice.add_transport_from_qr(qr)
|
|
|
|
new_alice_addr = alice.list_transports()[1]["addr"]
|
|
alice.set_config("configured_addr", new_alice_addr)
|
|
|
|
bob_chat.send_text("Hello!")
|
|
msg = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert msg.chat == alice.create_chat(bob)
|
|
|
|
|
|
def test_transport_limit(acfactory) -> None:
|
|
"""Test transports limit."""
|
|
account = acfactory.get_online_account()
|
|
qr = acfactory.get_account_qr()
|
|
|
|
limit = 5
|
|
|
|
for _ in range(1, limit):
|
|
account.add_transport_from_qr(qr)
|
|
|
|
assert len(account.list_transports()) == limit
|
|
|
|
with pytest.raises(JsonRpcError):
|
|
account.add_transport_from_qr(qr)
|
|
|
|
second_addr = account.list_transports()[1]["addr"]
|
|
account.delete_transport(second_addr)
|
|
|
|
# test that adding a transport after deleting one works again
|
|
account.add_transport_from_qr(qr)
|
|
|
|
|
|
def test_message_info_imap_urls(acfactory) -> None:
|
|
"""Test that message info contains IMAP URLs of where the message was received."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
qr = acfactory.get_account_qr()
|
|
for i in range(3):
|
|
alice.add_transport_from_qr(qr)
|
|
# Wait for all transports to go IDLE after adding each one.
|
|
for _ in range(i + 1):
|
|
alice.bring_online()
|
|
|
|
# Enable multi-device mode so messages are not deleted immediately.
|
|
alice.set_config("bcc_self", "1")
|
|
|
|
# Bob creates chat, learning about Alice's currently selected transport.
|
|
# This is where he will send the message.
|
|
bob_chat = bob.create_chat(alice)
|
|
|
|
# Alice switches to another transport and removes the rest of the transports.
|
|
new_alice_addr = alice.list_transports()[1]["addr"]
|
|
alice.set_config("configured_addr", new_alice_addr)
|
|
removed_addrs = []
|
|
for transport in alice.list_transports():
|
|
if transport["addr"] != new_alice_addr:
|
|
alice.delete_transport(transport["addr"])
|
|
removed_addrs.append(transport["addr"])
|
|
alice.stop_io()
|
|
alice.start_io()
|
|
|
|
bob_chat.send_text("Hello!")
|
|
|
|
msg = alice.wait_for_incoming_msg()
|
|
msg_info = msg.get_info()
|
|
assert new_alice_addr in msg_info
|
|
for removed_addr in removed_addrs:
|
|
assert removed_addr not in msg_info
|
|
assert f"{new_alice_addr}/INBOX" in msg_info
|
|
|
|
|
|
def test_remove_primary_transport(acfactory) -> None:
|
|
"""Test that after removing the primary relay, Alice can still receive messages."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
qr = acfactory.get_account_qr()
|
|
|
|
alice.add_transport_from_qr(qr)
|
|
alice.bring_online()
|
|
|
|
bob_chat = bob.create_chat(alice)
|
|
alice.create_chat(bob)
|
|
|
|
# Alice changes the transport.
|
|
[transport1, transport2] = alice.list_transports()
|
|
alice.set_config("configured_addr", transport2["addr"])
|
|
|
|
bob_chat.send_text("Hello!")
|
|
msg1 = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert msg1.text == "Hello!"
|
|
|
|
# Alice deletes the first transport.
|
|
alice.delete_transport(transport1["addr"])
|
|
alice.stop_io()
|
|
alice.start_io()
|
|
|
|
bob_chat.send_text("Hello again!")
|
|
msg2 = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert msg2.text == "Hello again!"
|