mirror of
https://github.com/chatmail/core.git
synced 2026-05-02 04:46:29 +03:00
This guarantees that the primary transport is never removed while being in use. At least this way it's obvious that there are no such corner cases.
329 lines
11 KiB
Python
329 lines
11 KiB
Python
import pytest
|
|
|
|
from deltachat_rpc_client import EventType
|
|
from deltachat_rpc_client.const import ChatType, DownloadState
|
|
from deltachat_rpc_client.rpc import JsonRpcError
|
|
|
|
|
|
def test_add_second_address(acfactory) -> None:
|
|
account = acfactory.new_configured_account()
|
|
assert len(account.list_transports()) == 1
|
|
|
|
assert account.get_config("show_emails") == "2"
|
|
|
|
qr = acfactory.get_account_qr()
|
|
account.add_transport_from_qr(qr)
|
|
assert len(account.list_transports()) == 2
|
|
|
|
account.add_transport_from_qr(qr)
|
|
assert len(account.list_transports()) == 3
|
|
|
|
first_addr = account.list_transports()[0]["addr"]
|
|
second_addr = account.list_transports()[1]["addr"]
|
|
|
|
# Cannot delete the first address.
|
|
with pytest.raises(JsonRpcError):
|
|
account.delete_transport(first_addr)
|
|
|
|
account.delete_transport(second_addr)
|
|
assert len(account.list_transports()) == 2
|
|
|
|
# show_emails does not matter for multi-relay, can be set to anything
|
|
account.set_config("show_emails", "0")
|
|
|
|
|
|
def test_second_transport_without_classic_emails(acfactory) -> None:
|
|
"""Test that second transport can be configured if classic emails are not fetched."""
|
|
account = acfactory.new_configured_account()
|
|
assert len(account.list_transports()) == 1
|
|
|
|
assert account.get_config("show_emails") == "2"
|
|
|
|
qr = acfactory.get_account_qr()
|
|
account.set_config("show_emails", "0")
|
|
|
|
account.add_transport_from_qr(qr)
|
|
|
|
|
|
def test_change_address(acfactory) -> None:
|
|
"""Test Alice configuring a second transport and setting it as a primary one."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob_addr = bob.get_config("configured_addr")
|
|
bob.create_chat(alice)
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
msg1 = bob.wait_for_incoming_msg().get_snapshot()
|
|
sender_addr1 = msg1.sender.get_snapshot().address
|
|
|
|
alice.stop_io()
|
|
old_alice_addr = alice.get_config("configured_addr")
|
|
alice_vcard = alice.self_contact.make_vcard()
|
|
assert old_alice_addr in alice_vcard
|
|
qr = acfactory.get_account_qr()
|
|
alice.add_transport_from_qr(qr)
|
|
new_alice_addr = alice.list_transports()[1]["addr"]
|
|
with pytest.raises(JsonRpcError):
|
|
# Cannot use the address that is not
|
|
# configured for any transport.
|
|
alice.set_config("configured_addr", bob_addr)
|
|
|
|
# Load old address so it is cached.
|
|
assert alice.get_config("configured_addr") == old_alice_addr
|
|
alice.set_config("configured_addr", new_alice_addr)
|
|
# Make sure that setting `configured_addr` invalidated the cache.
|
|
assert alice.get_config("configured_addr") == new_alice_addr
|
|
|
|
alice_vcard = alice.self_contact.make_vcard()
|
|
assert old_alice_addr not in alice_vcard
|
|
assert new_alice_addr in alice_vcard
|
|
with pytest.raises(JsonRpcError):
|
|
alice.delete_transport(new_alice_addr)
|
|
alice.start_io()
|
|
|
|
alice_chat_bob.send_text("Hello again!")
|
|
|
|
msg2 = bob.wait_for_incoming_msg().get_snapshot()
|
|
sender_addr2 = msg2.sender.get_snapshot().address
|
|
|
|
assert msg1.sender == msg2.sender
|
|
assert sender_addr1 != sender_addr2
|
|
assert sender_addr1 == old_alice_addr
|
|
assert sender_addr2 == new_alice_addr
|
|
|
|
|
|
def test_download_on_demand(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
alice.set_config("download_limit", "1")
|
|
|
|
alice.stop_io()
|
|
qr = acfactory.get_account_qr()
|
|
alice.add_transport_from_qr(qr)
|
|
alice.start_io()
|
|
|
|
alice.create_chat(bob)
|
|
chat_bob_alice = bob.create_chat(alice)
|
|
chat_bob_alice.send_message(file="../test-data/image/screenshot.jpg")
|
|
msg = alice.wait_for_incoming_msg()
|
|
snapshot = msg.get_snapshot()
|
|
assert snapshot.download_state == DownloadState.AVAILABLE
|
|
chat_id = snapshot.chat_id
|
|
# Actually the message isn't available yet. Wait somehow for the post-message to arrive.
|
|
chat_bob_alice.send_message("Now you can download my previous message")
|
|
alice.wait_for_incoming_msg()
|
|
alice._rpc.download_full_message(alice.id, msg.id)
|
|
for dstate in [DownloadState.IN_PROGRESS, DownloadState.DONE]:
|
|
event = alice.wait_for_event(EventType.MSGS_CHANGED)
|
|
assert event.chat_id == chat_id
|
|
assert event.msg_id == msg.id
|
|
assert msg.get_snapshot().download_state == dstate
|
|
|
|
|
|
def test_reconfigure_transport(acfactory) -> None:
|
|
"""Test that reconfiguring the transport works."""
|
|
account = acfactory.get_online_account()
|
|
|
|
[transport] = account.list_transports()
|
|
account.add_or_update_transport(transport)
|
|
|
|
|
|
def test_transport_synchronization(acfactory, log) -> None:
|
|
"""Test synchronization of transports between devices."""
|
|
|
|
def wait_for_io_started(ac):
|
|
while True:
|
|
ev = ac.wait_for_event(EventType.INFO)
|
|
if "scheduler is running" in ev.msg:
|
|
return
|
|
|
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
|
ac1_clone = ac1.clone()
|
|
ac1_clone.bring_online()
|
|
|
|
qr = acfactory.get_account_qr()
|
|
|
|
ac1.add_transport_from_qr(qr)
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1_clone)
|
|
assert len(ac1.list_transports()) == 2
|
|
assert len(ac1_clone.list_transports()) == 2
|
|
|
|
ac1_clone.add_transport_from_qr(qr)
|
|
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1)
|
|
assert len(ac1.list_transports()) == 3
|
|
assert len(ac1_clone.list_transports()) == 3
|
|
|
|
log.section("ac1 clone removes second transport")
|
|
[transport1, transport2, transport3] = ac1_clone.list_transports()
|
|
addr3 = transport3["addr"]
|
|
ac1_clone.delete_transport(transport2["addr"])
|
|
|
|
ac1.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1)
|
|
[transport1, transport3] = ac1.list_transports()
|
|
|
|
log.section("ac1 changes the primary transport")
|
|
ac1.set_config("configured_addr", transport3["addr"])
|
|
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
[transport1, transport3] = ac1_clone.list_transports()
|
|
assert ac1_clone.get_config("configured_addr") == addr3
|
|
|
|
log.section("ac1 removes the first transport")
|
|
ac1.delete_transport(transport1["addr"])
|
|
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
wait_for_io_started(ac1_clone)
|
|
[transport3] = ac1_clone.list_transports()
|
|
assert transport3["addr"] == addr3
|
|
assert ac1_clone.get_config("configured_addr") == addr3
|
|
|
|
ac2_chat = ac2.create_chat(ac1)
|
|
ac2_chat.send_text("Hello!")
|
|
|
|
assert ac1.wait_for_incoming_msg().get_snapshot().text == "Hello!"
|
|
assert ac1_clone.wait_for_incoming_msg().get_snapshot().text == "Hello!"
|
|
|
|
|
|
def test_transport_sync_new_as_primary(acfactory, log) -> None:
|
|
"""Test synchronization of new transport as primary between devices."""
|
|
ac1, bob = acfactory.get_online_accounts(2)
|
|
ac1_clone = ac1.clone()
|
|
ac1_clone.bring_online()
|
|
|
|
qr = acfactory.get_account_qr()
|
|
|
|
ac1.add_transport_from_qr(qr)
|
|
ac1_transports = ac1.list_transports()
|
|
assert len(ac1_transports) == 2
|
|
[transport1, transport2] = ac1_transports
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
assert len(ac1_clone.list_transports()) == 2
|
|
assert ac1_clone.get_config("configured_addr") == transport1["addr"]
|
|
|
|
log.section("ac1 changes the primary transport")
|
|
ac1.set_config("configured_addr", transport2["addr"])
|
|
|
|
ac1_clone.wait_for_event(EventType.TRANSPORTS_MODIFIED)
|
|
assert ac1_clone.get_config("configured_addr") == transport2["addr"]
|
|
|
|
log.section("ac1_clone receives a message via the new primary transport")
|
|
ac1_chat = ac1.create_chat(bob)
|
|
ac1_chat.send_text("Hello!")
|
|
bob_chat_id = bob.wait_for_incoming_msg_event().chat_id
|
|
bob_chat = bob.get_chat_by_id(bob_chat_id)
|
|
bob_chat.accept()
|
|
bob_chat.send_text("hello back")
|
|
assert ac1_clone.wait_for_incoming_msg().get_snapshot().text == "hello back"
|
|
|
|
|
|
def test_recognize_self_address(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob_chat = bob.create_chat(alice)
|
|
|
|
qr = acfactory.get_account_qr()
|
|
alice.add_transport_from_qr(qr)
|
|
|
|
new_alice_addr = alice.list_transports()[1]["addr"]
|
|
alice.set_config("configured_addr", new_alice_addr)
|
|
|
|
bob_chat.send_text("Hello!")
|
|
msg = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert msg.chat == alice.create_chat(bob)
|
|
|
|
|
|
def test_transport_limit(acfactory) -> None:
|
|
"""Test transports limit."""
|
|
account = acfactory.get_online_account()
|
|
qr = acfactory.get_account_qr()
|
|
|
|
limit = 5
|
|
|
|
for _ in range(1, limit):
|
|
account.add_transport_from_qr(qr)
|
|
|
|
assert len(account.list_transports()) == limit
|
|
|
|
with pytest.raises(JsonRpcError):
|
|
account.add_transport_from_qr(qr)
|
|
|
|
second_addr = account.list_transports()[1]["addr"]
|
|
account.delete_transport(second_addr)
|
|
|
|
# test that adding a transport after deleting one works again
|
|
account.add_transport_from_qr(qr)
|
|
|
|
|
|
def test_message_info_imap_urls(acfactory) -> None:
|
|
"""Test that message info contains IMAP URLs of where the message was received."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
qr = acfactory.get_account_qr()
|
|
for i in range(3):
|
|
alice.add_transport_from_qr(qr)
|
|
# Wait for all transports to go IDLE after adding each one.
|
|
for _ in range(i + 1):
|
|
alice.bring_online()
|
|
|
|
# Enable multi-device mode so messages are not deleted immediately.
|
|
alice.set_config("bcc_self", "1")
|
|
|
|
# Bob creates chat, learning about Alice's currently selected transport.
|
|
# This is where he will send the message.
|
|
bob_chat = bob.create_chat(alice)
|
|
|
|
# Alice switches to another transport and removes the rest of the transports.
|
|
new_alice_addr = alice.list_transports()[1]["addr"]
|
|
alice.set_config("configured_addr", new_alice_addr)
|
|
removed_addrs = []
|
|
for transport in alice.list_transports():
|
|
if transport["addr"] != new_alice_addr:
|
|
alice.delete_transport(transport["addr"])
|
|
removed_addrs.append(transport["addr"])
|
|
alice.stop_io()
|
|
alice.start_io()
|
|
|
|
bob_chat.send_text("Hello!")
|
|
|
|
msg = alice.wait_for_incoming_msg()
|
|
msg_info = msg.get_info()
|
|
assert new_alice_addr in msg_info
|
|
for removed_addr in removed_addrs:
|
|
assert removed_addr not in msg_info
|
|
assert f"{new_alice_addr}/INBOX" in msg_info
|
|
|
|
|
|
def test_remove_primary_transport(acfactory, log) -> None:
|
|
"""Test that after removing the primary relay, Alice can still receive messages."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
qr = acfactory.get_account_qr()
|
|
|
|
alice.add_transport_from_qr(qr)
|
|
alice.bring_online()
|
|
|
|
bob_chat = bob.create_chat(alice)
|
|
alice.create_chat(bob)
|
|
|
|
log.section("Alice sets up second transport")
|
|
[transport1, transport2] = alice.list_transports()
|
|
alice.set_config("configured_addr", transport2["addr"])
|
|
|
|
bob_chat.send_text("Hello!")
|
|
msg1 = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert msg1.text == "Hello!"
|
|
|
|
log.section("Alice removes the primary relay")
|
|
alice.delete_transport(transport1["addr"])
|
|
alice.stop_io()
|
|
alice.start_io()
|
|
|
|
bob_chat.send_text("Hello again!")
|
|
msg2 = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert msg2.text == "Hello again!"
|
|
assert msg2.chat.get_basic_snapshot().chat_type == ChatType.SINGLE
|
|
assert msg2.chat == alice.create_chat(bob)
|