mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
1385 lines
47 KiB
Python
1385 lines
47 KiB
Python
import base64
|
|
import concurrent.futures
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import subprocess
|
|
import time
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
from deltachat_rpc_client import EventType, events
|
|
from deltachat_rpc_client.const import DownloadState, MessageState
|
|
from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS
|
|
from deltachat_rpc_client.rpc import JsonRpcError, Rpc
|
|
|
|
|
|
def test_system_info(rpc) -> None:
|
|
system_info = rpc.get_system_info()
|
|
assert "arch" in system_info
|
|
assert "deltachat_core_version" in system_info
|
|
|
|
|
|
def test_sleep(rpc) -> None:
|
|
"""Test that long-running task does not block short-running task from completion."""
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
|
|
sleep_5_future = executor.submit(rpc.sleep, 5.0)
|
|
sleep_3_future = executor.submit(rpc.sleep, 3.0)
|
|
done, pending = concurrent.futures.wait(
|
|
[sleep_5_future, sleep_3_future],
|
|
return_when=concurrent.futures.FIRST_COMPLETED,
|
|
)
|
|
assert sleep_3_future in done
|
|
assert sleep_5_future in pending
|
|
|
|
|
|
def test_email_address_validity(rpc) -> None:
|
|
valid_addresses = [
|
|
"email@example.com",
|
|
"36aa165ae3406424e0c61af17700f397cad3fe8ab83d682d0bddf3338a5dd52e@yggmail@yggmail",
|
|
]
|
|
invalid_addresses = ["email@", "example.com", "emai221"]
|
|
|
|
for addr in valid_addresses:
|
|
assert rpc.check_email_validity(addr)
|
|
for addr in invalid_addresses:
|
|
assert not rpc.check_email_validity(addr)
|
|
|
|
|
|
def test_acfactory(acfactory) -> None:
|
|
account = acfactory.new_configured_account()
|
|
while True:
|
|
event = account.wait_for_event()
|
|
if event.kind == EventType.CONFIGURE_PROGRESS:
|
|
assert event.progress != 0 # Progress 0 indicates error.
|
|
if event.progress == 1000: # Success
|
|
break
|
|
else:
|
|
logging.info(event)
|
|
logging.info("Successful configuration")
|
|
|
|
|
|
def test_configure_starttls(acfactory) -> None:
|
|
addr, password = acfactory.get_credentials()
|
|
account = acfactory.get_unconfigured_account()
|
|
account.add_or_update_transport(
|
|
{
|
|
"addr": addr,
|
|
"password": password,
|
|
"imapSecurity": "starttls",
|
|
"smtpSecurity": "starttls",
|
|
},
|
|
)
|
|
assert account.is_configured()
|
|
|
|
|
|
def test_lowercase_address(acfactory) -> None:
|
|
addr, password = acfactory.get_credentials()
|
|
addr_upper = addr.upper()
|
|
account = acfactory.get_unconfigured_account()
|
|
account.add_or_update_transport(
|
|
{
|
|
"addr": addr_upper,
|
|
"password": password,
|
|
},
|
|
)
|
|
assert account.is_configured()
|
|
assert addr_upper != addr
|
|
assert account.get_config("configured_addr") == addr
|
|
assert account.list_transports()[0]["addr"] == addr
|
|
|
|
param = account.get_info()["used_transport_settings"]
|
|
assert addr in param
|
|
assert addr_upper not in param
|
|
|
|
|
|
def test_configure_ip(acfactory) -> None:
|
|
addr, password = acfactory.get_credentials()
|
|
account = acfactory.get_unconfigured_account()
|
|
ip_address = socket.gethostbyname(addr.rsplit("@")[-1])
|
|
|
|
with pytest.raises(JsonRpcError):
|
|
account.add_or_update_transport(
|
|
{
|
|
"addr": addr,
|
|
"password": password,
|
|
# This should fail TLS check.
|
|
"imapServer": ip_address,
|
|
},
|
|
)
|
|
|
|
|
|
def test_configure_alternative_port(acfactory) -> None:
|
|
"""Test that configuration with alternative port 443 works."""
|
|
addr, password = acfactory.get_credentials()
|
|
account = acfactory.get_unconfigured_account()
|
|
account.add_or_update_transport(
|
|
{
|
|
"addr": addr,
|
|
"password": password,
|
|
"imapPort": 443,
|
|
"smtpPort": 443,
|
|
},
|
|
)
|
|
assert account.is_configured()
|
|
|
|
|
|
def test_list_transports(acfactory) -> None:
|
|
addr, password = acfactory.get_credentials()
|
|
account = acfactory.get_unconfigured_account()
|
|
account.add_or_update_transport(
|
|
{
|
|
"addr": addr,
|
|
"password": password,
|
|
"imapUser": addr,
|
|
},
|
|
)
|
|
transports = account.list_transports()
|
|
assert len(transports) == 1
|
|
params = transports[0]
|
|
assert params["addr"] == addr
|
|
assert params["password"] == password
|
|
assert params["imapUser"] == addr
|
|
|
|
|
|
def test_account(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob_addr = bob.get_config("addr")
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
event = bob.wait_for_incoming_msg_event()
|
|
chat_id = event.chat_id
|
|
msg_id = event.msg_id
|
|
|
|
message = bob.get_message_by_id(msg_id)
|
|
snapshot = message.get_snapshot()
|
|
assert snapshot.chat_id == chat_id
|
|
assert snapshot.text == "Hello!"
|
|
bob.mark_seen_messages([message])
|
|
|
|
assert alice != bob
|
|
assert repr(alice)
|
|
assert alice.get_info().level
|
|
assert alice.get_size()
|
|
assert alice.is_configured()
|
|
assert not alice.get_avatar()
|
|
# get_contact_by_addr() can lookup a key contact by address:
|
|
bob_contact = alice.get_contact_by_addr(bob_addr).get_snapshot()
|
|
assert bob_contact.display_name == "Bob"
|
|
assert bob_contact.is_key_contact
|
|
assert alice.get_contacts()
|
|
assert alice.get_contacts(snapshot=True)
|
|
assert alice.self_contact
|
|
assert alice.get_chatlist()
|
|
assert alice.get_chatlist(snapshot=True)
|
|
assert alice.get_qr_code()
|
|
assert alice.get_fresh_messages()
|
|
|
|
# Test sending empty message.
|
|
assert len(bob.wait_next_messages()) == 0
|
|
alice_chat_bob.send_text("")
|
|
messages = bob.wait_next_messages()
|
|
assert bob.get_next_messages() == messages
|
|
assert len(messages) == 1
|
|
message = messages[0]
|
|
snapshot = message.get_snapshot()
|
|
assert snapshot.text == ""
|
|
bob.mark_seen_messages([message])
|
|
|
|
group = alice.create_group("test group")
|
|
group.add_contact(alice_contact_bob)
|
|
group_msg = group.send_message(text="hello")
|
|
assert group_msg == alice.get_message_by_id(group_msg.id)
|
|
assert group == alice.get_chat_by_id(group.id)
|
|
alice.delete_messages([group_msg])
|
|
|
|
alice.set_config("selfstatus", "test")
|
|
assert alice.get_config("selfstatus") == "test"
|
|
alice.update_config(selfstatus="test2")
|
|
assert alice.get_config("selfstatus") == "test2"
|
|
|
|
assert not alice.get_blocked_contacts()
|
|
alice_contact_bob.block()
|
|
blocked_contacts = alice.get_blocked_contacts()
|
|
assert blocked_contacts
|
|
assert blocked_contacts[0].contact == alice_contact_bob
|
|
|
|
bob.remove()
|
|
alice.stop_io()
|
|
|
|
|
|
def test_chat(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
event = bob.wait_for_incoming_msg_event()
|
|
chat_id = event.chat_id
|
|
msg_id = event.msg_id
|
|
message = bob.get_message_by_id(msg_id)
|
|
snapshot = message.get_snapshot()
|
|
assert snapshot.chat_id == chat_id
|
|
assert snapshot.text == "Hello!"
|
|
bob_chat_alice = bob.get_chat_by_id(chat_id)
|
|
|
|
assert alice_chat_bob != bob_chat_alice
|
|
assert repr(alice_chat_bob)
|
|
alice_chat_bob.delete()
|
|
assert not bob_chat_alice.can_send()
|
|
bob_chat_alice.accept()
|
|
assert bob_chat_alice.can_send()
|
|
bob_chat_alice.block()
|
|
bob_chat_alice = snapshot.sender.create_chat()
|
|
bob_chat_alice.mute()
|
|
bob_chat_alice.unmute()
|
|
bob_chat_alice.pin()
|
|
bob_chat_alice.unpin()
|
|
bob_chat_alice.archive()
|
|
bob_chat_alice.unarchive()
|
|
with pytest.raises(JsonRpcError): # can't set name for 1:1 chats
|
|
bob_chat_alice.set_name("test")
|
|
bob_chat_alice.set_ephemeral_timer(300)
|
|
bob_chat_alice.get_encryption_info()
|
|
|
|
group = alice.create_group("test group")
|
|
to_resend = group.send_text("will be resent")
|
|
group.add_contact(alice_contact_bob)
|
|
group.get_qr_code()
|
|
|
|
snapshot = group.get_basic_snapshot()
|
|
assert snapshot.name == "test group"
|
|
group.set_name("new name")
|
|
snapshot = group.get_full_snapshot()
|
|
assert snapshot.name == "new name"
|
|
|
|
msg = group.send_message(text="hi")
|
|
assert (msg.get_snapshot()).text == "hi"
|
|
group.resend_messages([to_resend])
|
|
group.forward_messages([msg])
|
|
|
|
group.set_draft(text="test draft")
|
|
draft = group.get_draft()
|
|
assert draft.text == "test draft"
|
|
group.remove_draft()
|
|
assert not group.get_draft()
|
|
|
|
assert group.get_messages()
|
|
group.get_fresh_message_count()
|
|
group.mark_noticed()
|
|
assert group.get_fresh_message_count() == 0
|
|
group.mark_fresh()
|
|
assert group.get_fresh_message_count() > 0
|
|
assert group.get_contacts()
|
|
assert group.get_past_contacts() == []
|
|
group.remove_contact(alice_contact_bob)
|
|
assert len(group.get_past_contacts()) == 1
|
|
group.get_locations()
|
|
|
|
|
|
def test_contact(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob_addr = bob.get_config("addr")
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
|
|
assert alice_contact_bob == alice.get_contact_by_id(alice_contact_bob.id)
|
|
assert repr(alice_contact_bob)
|
|
alice_contact_bob.block()
|
|
alice_contact_bob.unblock()
|
|
alice_contact_bob.set_name("new name")
|
|
alice_contact_bob.get_encryption_info()
|
|
snapshot = alice_contact_bob.get_snapshot()
|
|
assert snapshot.address == bob_addr
|
|
alice_contact_bob.create_chat()
|
|
|
|
|
|
def test_message(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
event = bob.wait_for_incoming_msg_event()
|
|
chat_id = event.chat_id
|
|
msg_id = event.msg_id
|
|
|
|
message = bob.get_message_by_id(msg_id)
|
|
snapshot = message.get_snapshot()
|
|
assert snapshot.chat_id == chat_id
|
|
assert snapshot.text == "Hello!"
|
|
assert not snapshot.is_bot
|
|
assert repr(message)
|
|
|
|
with pytest.raises(JsonRpcError): # chat is not accepted
|
|
snapshot.chat.send_text("hi")
|
|
snapshot.chat.accept()
|
|
snapshot.chat.send_text("hi")
|
|
|
|
message.mark_seen()
|
|
message.send_reaction("😎")
|
|
reactions = message.get_reactions()
|
|
assert reactions
|
|
snapshot = message.get_snapshot()
|
|
assert reactions == snapshot.reactions
|
|
|
|
|
|
def test_receive_imf_failure(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
|
|
bob.set_config("simulate_receive_imf_error", "1")
|
|
alice_chat_bob.send_text("Hello!")
|
|
event = bob.wait_for_event(EventType.MSGS_CHANGED)
|
|
assert event.chat_id == bob.get_device_chat().id
|
|
msg_id = event.msg_id
|
|
message = bob.get_message_by_id(msg_id)
|
|
snapshot = message.get_snapshot()
|
|
version = bob.get_info()["deltachat_core_version"]
|
|
assert (
|
|
snapshot.text == "❌ Failed to receive a message:"
|
|
" Condition failed: `!context.get_config_bool(Config::SimulateReceiveImfError).await?`."
|
|
f" Core version {version}."
|
|
" Please report this bug to delta@merlinux.eu or https://support.delta.chat/."
|
|
)
|
|
|
|
# The failed message doesn't break the IMAP loop.
|
|
bob.set_config("simulate_receive_imf_error", "0")
|
|
alice_chat_bob.send_text("Hello again!")
|
|
message = bob.wait_for_incoming_msg()
|
|
snapshot = message.get_snapshot()
|
|
assert snapshot.text == "Hello again!"
|
|
assert snapshot.error is None
|
|
|
|
|
|
def test_selfavatar_sync(acfactory, data, log) -> None:
|
|
alice = acfactory.get_online_account()
|
|
|
|
log.section("Alice adds a second device")
|
|
alice2 = alice.clone()
|
|
|
|
log.section("Second device goes online")
|
|
alice2.start_io()
|
|
|
|
log.section("First device changes avatar")
|
|
image = data.get_path("image/avatar1000x1000.jpg")
|
|
alice.set_config("selfavatar", image)
|
|
avatar_config = alice.get_config("selfavatar")
|
|
avatar_hash = os.path.basename(avatar_config)
|
|
logging.info(f"Avatar hash is {avatar_hash}")
|
|
|
|
log.section("First device receives avatar change")
|
|
alice2.wait_for_event(EventType.SELFAVATAR_CHANGED)
|
|
avatar_config2 = alice2.get_config("selfavatar")
|
|
avatar_hash2 = os.path.basename(avatar_config2)
|
|
logging.info(f"Avatar hash on second device is {avatar_hash2}")
|
|
assert avatar_hash == avatar_hash2
|
|
assert avatar_config != avatar_config2
|
|
|
|
|
|
def test_dont_move_sync_msgs(acfactory, direct_imap):
|
|
addr, password = acfactory.get_credentials()
|
|
ac1 = acfactory.get_unconfigured_account()
|
|
ac1.set_config("bcc_self", "1")
|
|
ac1.set_config("fix_is_chatmail", "1")
|
|
ac1.add_or_update_transport({"addr": addr, "password": password})
|
|
ac1.start_io()
|
|
ac1_direct_imap = direct_imap(ac1)
|
|
|
|
# Sync messages may also be sent during configuration.
|
|
ac1.wait_for_event(EventType.MSG_DELIVERED)
|
|
ac1_direct_imap.select_folder("Inbox")
|
|
while True:
|
|
if len(ac1_direct_imap.get_all_messages()) == 1:
|
|
break
|
|
time.sleep(1)
|
|
|
|
ac1.set_config("displayname", "Alice")
|
|
ac1.wait_for_event(EventType.MSG_DELIVERED)
|
|
ac1.set_config("displayname", "Bob")
|
|
ac1.wait_for_event(EventType.MSG_DELIVERED)
|
|
|
|
# Message may not be delivered to IMAP immediately
|
|
# after sending over SMTP,
|
|
# retry until they are delivered to IMAP.
|
|
while True:
|
|
if len(ac1_direct_imap.get_all_messages()) == 3:
|
|
break
|
|
time.sleep(1)
|
|
|
|
|
|
def test_reaction_seen_on_another_dev(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
alice2 = alice.clone()
|
|
alice2.start_io()
|
|
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
event = bob.wait_for_incoming_msg_event()
|
|
msg_id = event.msg_id
|
|
|
|
message = bob.get_message_by_id(msg_id)
|
|
snapshot = message.get_snapshot()
|
|
snapshot.chat.accept()
|
|
message.send_reaction("😎")
|
|
for a in [alice, alice2]:
|
|
a.wait_for_event(EventType.INCOMING_REACTION)
|
|
|
|
alice2.clear_all_events()
|
|
alice_chat_bob.mark_noticed()
|
|
chat_id = alice2.wait_for_event(EventType.MSGS_NOTICED).chat_id
|
|
alice2_chat_bob = alice2.create_chat(bob)
|
|
assert chat_id == alice2_chat_bob.id
|
|
|
|
|
|
def test_is_bot(acfactory) -> None:
|
|
"""Test that we can recognize messages submitted by bots."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
|
|
# Alice becomes a bot.
|
|
alice.set_config("bot", "1")
|
|
alice_chat_bob.send_text("Hello!")
|
|
|
|
snapshot = bob.wait_for_incoming_msg().get_snapshot()
|
|
assert snapshot.text == "Hello!"
|
|
assert snapshot.is_bot
|
|
|
|
|
|
def test_bot(acfactory) -> None:
|
|
mock = MagicMock()
|
|
user = (acfactory.get_online_accounts(1))[0]
|
|
bot = acfactory.new_configured_bot()
|
|
bot2 = acfactory.new_configured_bot()
|
|
|
|
assert bot.is_configured()
|
|
assert bot.account.get_config("bot") == "1"
|
|
|
|
hook = lambda e: mock.hook(e.msg_id) and None, events.RawEvent(EventType.INCOMING_MSG)
|
|
bot.add_hook(*hook)
|
|
event = acfactory.process_message(from_account=user, to_client=bot, text="Hello!")
|
|
snapshot = bot.account.get_message_by_id(event.msg_id).get_snapshot()
|
|
assert not snapshot.is_bot
|
|
mock.hook.assert_called_once_with(event.msg_id)
|
|
bot.remove_hook(*hook)
|
|
|
|
def track(e):
|
|
mock.hook(e.message_snapshot.id)
|
|
|
|
mock.hook.reset_mock()
|
|
hook = track, events.NewMessage(r"hello")
|
|
bot.add_hook(*hook)
|
|
bot.add_hook(track, events.NewMessage(command="/help"))
|
|
event = acfactory.process_message(from_account=user, to_client=bot, text="hello")
|
|
mock.hook.assert_called_with(event.msg_id)
|
|
event = acfactory.process_message(from_account=user, to_client=bot, text="hello!")
|
|
mock.hook.assert_called_with(event.msg_id)
|
|
acfactory.process_message(from_account=bot2.account, to_client=bot, text="hello")
|
|
assert len(mock.hook.mock_calls) == 2 # bot messages are ignored between bots
|
|
acfactory.process_message(from_account=user, to_client=bot, text="hey!")
|
|
assert len(mock.hook.mock_calls) == 2
|
|
bot.remove_hook(*hook)
|
|
|
|
mock.hook.reset_mock()
|
|
acfactory.process_message(from_account=user, to_client=bot, text="hello")
|
|
event = acfactory.process_message(from_account=user, to_client=bot, text="/help")
|
|
mock.hook.assert_called_once_with(event.msg_id)
|
|
|
|
|
|
def test_wait_next_messages(acfactory) -> None:
|
|
alice = acfactory.get_online_account()
|
|
|
|
# Create a bot account so it does not receive device messages in the beginning.
|
|
addr, password = acfactory.get_credentials()
|
|
bot = acfactory.get_unconfigured_account()
|
|
bot.set_config("bot", "1")
|
|
bot.add_or_update_transport({"addr": addr, "password": password})
|
|
assert bot.is_configured()
|
|
bot.bring_online()
|
|
|
|
# There are no old messages and the call returns immediately.
|
|
assert not bot.wait_next_messages()
|
|
|
|
# Bot starts waiting for messages.
|
|
next_messages_task = bot.wait_next_messages.future()
|
|
|
|
alice_contact_bot = alice.create_contact(bot, "Bot")
|
|
alice_chat_bot = alice_contact_bot.create_chat()
|
|
alice_chat_bot.send_text("Hello!")
|
|
|
|
next_messages = next_messages_task()
|
|
|
|
if len(next_messages) == E2EE_INFO_MSGS:
|
|
next_messages += bot.wait_next_messages()
|
|
|
|
assert len(next_messages) == 1 + E2EE_INFO_MSGS
|
|
snapshot = next_messages[0 + E2EE_INFO_MSGS].get_snapshot()
|
|
assert snapshot.text == "Hello!"
|
|
|
|
|
|
def test_import_export_backup(acfactory, tmp_path) -> None:
|
|
alice = acfactory.new_configured_account()
|
|
alice.export_backup(tmp_path)
|
|
|
|
files = list(tmp_path.glob("*.tar"))
|
|
alice2 = acfactory.get_unconfigured_account()
|
|
alice2.import_backup(files[0])
|
|
|
|
assert alice2.manager.get_system_info()
|
|
|
|
|
|
def test_import_export_online_all(acfactory, tmp_path, data, log) -> None:
|
|
(ac1, some1) = acfactory.get_online_accounts(2)
|
|
|
|
log.section("create some chat content")
|
|
some1_addr = some1.get_config("addr")
|
|
chat1 = ac1.create_contact(some1).create_chat()
|
|
chat1.send_text("msg1")
|
|
assert len(ac1.get_contacts()) == 1
|
|
|
|
original_image_path = data.get_path("image/avatar64x64.png")
|
|
chat1.send_file(str(original_image_path))
|
|
|
|
# Add another 100KB file that ensures that the progress is smooth enough
|
|
path = tmp_path / "attachment.txt"
|
|
with path.open("w") as file:
|
|
file.truncate(100000)
|
|
chat1.send_file(str(path))
|
|
|
|
def assert_account_is_proper(ac):
|
|
contacts = ac.get_contacts()
|
|
assert len(contacts) == 1
|
|
contact2 = contacts[0]
|
|
assert contact2.get_snapshot().address == some1_addr
|
|
chat2 = contact2.create_chat()
|
|
messages = chat2.get_messages()
|
|
assert len(messages) == 3 + E2EE_INFO_MSGS
|
|
assert messages[0 + E2EE_INFO_MSGS].get_snapshot().text == "msg1"
|
|
snapshot = messages[1 + E2EE_INFO_MSGS].get_snapshot()
|
|
assert snapshot.file_mime == "image/png"
|
|
assert os.stat(snapshot.file).st_size == os.stat(original_image_path).st_size
|
|
ac.set_config("displayname", "new displayname")
|
|
assert ac.get_config("displayname") == "new displayname"
|
|
|
|
assert_account_is_proper(ac1)
|
|
|
|
backupdir = tmp_path / "backup"
|
|
backupdir.mkdir()
|
|
|
|
log.section(f"export all to {backupdir}")
|
|
ac1.stop_io()
|
|
ac1.export_backup(backupdir)
|
|
progress = 0
|
|
files_written = []
|
|
while True:
|
|
event = ac1.wait_for_event()
|
|
if event.kind == EventType.IMEX_PROGRESS:
|
|
assert event.progress > 0 # Progress 0 indicates error.
|
|
assert event.progress < progress + 250
|
|
progress = event.progress
|
|
if progress == 1000:
|
|
break
|
|
elif event.kind == EventType.IMEX_FILE_WRITTEN:
|
|
files_written.append(event.path)
|
|
else:
|
|
logging.info(event)
|
|
assert len(files_written) == 1
|
|
assert os.path.exists(files_written[0])
|
|
ac1.start_io()
|
|
|
|
log.section("get fresh empty account")
|
|
ac2 = acfactory.get_unconfigured_account()
|
|
|
|
log.section("import backup and check it's proper")
|
|
ac2.import_backup(files_written[0])
|
|
progress = 0
|
|
while True:
|
|
event = ac2.wait_for_event()
|
|
if event.kind == EventType.IMEX_PROGRESS:
|
|
assert event.progress > 0 # Progress 0 indicates error.
|
|
assert event.progress < progress + 250
|
|
progress = event.progress
|
|
if progress == 1000:
|
|
break
|
|
else:
|
|
logging.info(event)
|
|
assert_account_is_proper(ac1)
|
|
assert_account_is_proper(ac2)
|
|
|
|
log.section(f"Second-time export all to {backupdir}")
|
|
ac1.stop_io()
|
|
ac1.export_backup(backupdir)
|
|
while True:
|
|
event = ac1.wait_for_event()
|
|
if event.kind == EventType.IMEX_PROGRESS:
|
|
assert event.progress > 0
|
|
if event.progress == 1000:
|
|
break
|
|
elif event.kind == EventType.IMEX_FILE_WRITTEN:
|
|
files_written.append(event.path)
|
|
else:
|
|
logging.info(event)
|
|
assert len(files_written) == 2
|
|
assert os.path.exists(files_written[1])
|
|
assert files_written[1] != files_written[0]
|
|
assert len(list(backupdir.glob("*.tar"))) == 2
|
|
|
|
|
|
def test_import_export_keys(acfactory, tmp_path) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
alice_chat_bob.send_text("Hello Bob!")
|
|
|
|
snapshot = bob.wait_for_incoming_msg().get_snapshot()
|
|
assert snapshot.text == "Hello Bob!"
|
|
|
|
# Alice resetups account, but keeps the key.
|
|
alice_keys_path = tmp_path / "alice_keys"
|
|
alice_keys_path.mkdir()
|
|
alice.export_self_keys(alice_keys_path)
|
|
alice = acfactory.resetup_account(alice)
|
|
alice.import_self_keys(alice_keys_path)
|
|
|
|
snapshot.chat.accept()
|
|
snapshot.chat.send_text("Hello Alice!")
|
|
snapshot = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert snapshot.text == "Hello Alice!"
|
|
assert snapshot.show_padlock
|
|
|
|
|
|
def test_openrpc_command_line() -> None:
|
|
"""Test that "deltachat-rpc-server --openrpc" command returns an OpenRPC specification."""
|
|
out = subprocess.run(["deltachat-rpc-server", "--openrpc"], capture_output=True, check=True).stdout
|
|
openrpc = json.loads(out)
|
|
assert "openrpc" in openrpc
|
|
assert "methods" in openrpc
|
|
|
|
|
|
def test_early_failure(tmp_path) -> None:
|
|
"""Test that Rpc.start() raises on invalid accounts directories."""
|
|
# A file instead of a directory.
|
|
file_path = tmp_path / "not_a_dir"
|
|
file_path.write_text("I am a file, not a directory")
|
|
rpc = Rpc(accounts_dir=str(file_path))
|
|
with pytest.raises(JsonRpcError, match="(?i)directory"):
|
|
rpc.start()
|
|
|
|
# A non-empty directory that is not a deltachat accounts directory.
|
|
non_dc_dir = tmp_path / "invalid_dir"
|
|
non_dc_dir.mkdir()
|
|
(non_dc_dir / "some_file").write_text("content")
|
|
rpc = Rpc(accounts_dir=str(non_dc_dir))
|
|
with pytest.raises(JsonRpcError, match="invalid_dir"):
|
|
rpc.start()
|
|
|
|
|
|
def test_provider_info(rpc) -> None:
|
|
account_id = rpc.add_account()
|
|
|
|
provider_info = rpc.get_provider_info(account_id, "example.org")
|
|
assert provider_info["id"] == "example.com"
|
|
|
|
provider_info = rpc.get_provider_info(account_id, "uep7oiw4ahtaizuloith.org")
|
|
assert provider_info is None
|
|
|
|
# Test MX record resolution.
|
|
# This previously resulted in Gmail provider
|
|
# because MX record pointed to google.com domain,
|
|
# but MX record resolution has been removed.
|
|
provider_info = rpc.get_provider_info(account_id, "github.com")
|
|
assert provider_info is None
|
|
|
|
# Disable MX record resolution.
|
|
rpc.set_config(account_id, "proxy_enabled", "1")
|
|
provider_info = rpc.get_provider_info(account_id, "github.com")
|
|
assert provider_info is None
|
|
|
|
|
|
def test_mdn_doesnt_break_autocrypt(acfactory) -> None:
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_contact_bob = alice.create_contact(bob, "Bob")
|
|
|
|
# Bob creates chat manually so chat with Alice is accepted.
|
|
alice_chat_bob = alice_contact_bob.create_chat()
|
|
|
|
# Alice sends a message to Bob.
|
|
alice_chat_bob.send_text("Hello Bob!")
|
|
snapshot = bob.wait_for_incoming_msg().get_snapshot()
|
|
|
|
# Bob sends a message to Alice.
|
|
bob_chat_alice = snapshot.chat
|
|
bob_chat_alice.accept()
|
|
bob_chat_alice.send_text("Hello Alice!")
|
|
message = alice.wait_for_incoming_msg()
|
|
snapshot = message.get_snapshot()
|
|
assert snapshot.show_padlock
|
|
|
|
# Alice reads Bob's message.
|
|
message.mark_seen()
|
|
bob.wait_for_event(EventType.MSG_READ)
|
|
|
|
# Bob sends a message to Alice, it should also be encrypted.
|
|
bob_chat_alice.send_text("Hi Alice!")
|
|
snapshot = alice.wait_for_incoming_msg().get_snapshot()
|
|
assert snapshot.show_padlock
|
|
|
|
|
|
@pytest.mark.parametrize("n_accounts", [3, 2])
|
|
def test_download_limit_chat_assignment(acfactory, tmp_path, n_accounts):
|
|
download_limit = 300000
|
|
|
|
alice, *others = acfactory.get_online_accounts(n_accounts)
|
|
bob = others[0]
|
|
|
|
alice_group = alice.create_group("test group")
|
|
for account in others:
|
|
chat = account.create_chat(alice)
|
|
chat.send_text("Hello Alice!")
|
|
assert alice.wait_for_incoming_msg().get_snapshot().text == "Hello Alice!"
|
|
|
|
contact = alice.create_contact(account)
|
|
alice_group.add_contact(contact)
|
|
|
|
bob.set_config("download_limit", str(download_limit))
|
|
|
|
alice_group.send_text("hi")
|
|
snapshot = bob.wait_for_incoming_msg().get_snapshot()
|
|
assert snapshot.text == "hi"
|
|
bob_group = snapshot.chat
|
|
|
|
path = tmp_path / "large"
|
|
path.write_bytes(os.urandom(download_limit + 1))
|
|
|
|
n_done = 0
|
|
for i in range(10):
|
|
logging.info("Sending message %s", i)
|
|
alice_group.send_file(str(path))
|
|
snapshot = bob.wait_for_incoming_msg().get_snapshot()
|
|
if snapshot.download_state == DownloadState.DONE:
|
|
n_done += 1
|
|
# Work around lost and reordered pre-messages.
|
|
assert n_done <= 1
|
|
else:
|
|
assert snapshot.download_state == DownloadState.AVAILABLE
|
|
assert snapshot.chat == bob_group
|
|
|
|
|
|
def test_download_small_msg_first(acfactory, tmp_path):
|
|
download_limit = 70000
|
|
|
|
alice, bob0 = acfactory.get_online_accounts(2)
|
|
bob1 = bob0.clone()
|
|
bob1.set_config("download_limit", str(download_limit))
|
|
|
|
chat = alice.create_chat(bob0)
|
|
path = tmp_path / "large_enough"
|
|
path.write_bytes(os.urandom(download_limit + 1))
|
|
# Less than 140K, so sent w/o a pre-message.
|
|
chat.send_file(str(path))
|
|
chat.send_text("hi")
|
|
bob0.create_chat(alice)
|
|
assert bob0.wait_for_incoming_msg().get_snapshot().text == ""
|
|
assert bob0.wait_for_incoming_msg().get_snapshot().text == "hi"
|
|
|
|
bob1.start_io()
|
|
bob1.create_chat(alice)
|
|
assert bob1.wait_for_incoming_msg().get_snapshot().text == "hi"
|
|
assert bob1.wait_for_incoming_msg().get_snapshot().text == ""
|
|
|
|
|
|
@pytest.mark.parametrize("delete_chat", [False, True])
|
|
def test_delete_available_msg(acfactory, tmp_path, direct_imap, delete_chat):
|
|
"""
|
|
Tests `DownloadState.AVAILABLE` message deletion on the receiver side.
|
|
Also tests pre- and post-message deletion on the sender side.
|
|
"""
|
|
# Min. UI setting as of v2.35
|
|
download_limit = 163840
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
bob.set_config("download_limit", str(download_limit))
|
|
# Avoid immediate deletion from the server
|
|
alice.set_config("bcc_self", "1")
|
|
bob.set_config("bcc_self", "1")
|
|
|
|
chat_alice = alice.create_chat(bob)
|
|
path = tmp_path / "large"
|
|
path.write_bytes(os.urandom(download_limit + 1))
|
|
msg_alice = chat_alice.send_file(str(path))
|
|
msg_bob = bob.wait_for_incoming_msg()
|
|
msg_bob_snapshot = msg_bob.get_snapshot()
|
|
assert msg_bob_snapshot.download_state == DownloadState.AVAILABLE
|
|
chat_bob = bob.get_chat_by_id(msg_bob_snapshot.chat_id)
|
|
|
|
# Avoid DeleteMessages sync message
|
|
bob.set_config("bcc_self", "0")
|
|
if delete_chat:
|
|
chat_bob.delete()
|
|
else:
|
|
bob.delete_messages([msg_bob])
|
|
alice.wait_for_event(EventType.SMTP_MESSAGE_SENT)
|
|
alice.wait_for_event(EventType.SMTP_MESSAGE_SENT)
|
|
alice.set_config("bcc_self", "0")
|
|
if delete_chat:
|
|
chat_alice.delete()
|
|
else:
|
|
alice.delete_messages([msg_alice])
|
|
for acc in [bob, alice]:
|
|
if not delete_chat:
|
|
acc.wait_for_event(EventType.MSG_DELETED)
|
|
acc_direct_imap = direct_imap(acc)
|
|
# Messages may be deleted separately
|
|
while True:
|
|
acc.wait_for_event(EventType.IMAP_MESSAGE_DELETED)
|
|
while True:
|
|
event = acc.wait_for_event()
|
|
if event.kind == EventType.INFO and "Close/expunge succeeded." in event.msg:
|
|
break
|
|
if len(acc_direct_imap.get_all_messages()) == 0:
|
|
break
|
|
|
|
|
|
def test_delete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
# Avoid immediate deletion from the server
|
|
bob.set_config("bcc_self", "1")
|
|
|
|
chat_alice = alice.create_chat(bob)
|
|
path = tmp_path / "large"
|
|
# Big enough to be sent with a pre-message
|
|
path.write_bytes(os.urandom(300000))
|
|
chat_alice.send_file(str(path))
|
|
|
|
msg = bob.wait_for_incoming_msg()
|
|
msg_snapshot = msg.get_snapshot()
|
|
assert msg_snapshot.download_state == DownloadState.AVAILABLE
|
|
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
|
assert msgs_changed_event.msg_id == msg.id
|
|
msg_snapshot = msg.get_snapshot()
|
|
assert msg_snapshot.download_state == DownloadState.DONE
|
|
|
|
bob_direct_imap = direct_imap(bob)
|
|
assert len(bob_direct_imap.get_all_messages()) == 2
|
|
# Avoid DeleteMessages sync message
|
|
bob.set_config("bcc_self", "0")
|
|
bob.delete_messages([msg])
|
|
bob.wait_for_event(EventType.MSG_DELETED)
|
|
# Messages may be deleted separately
|
|
while True:
|
|
bob.wait_for_event(EventType.IMAP_MESSAGE_DELETED)
|
|
while True:
|
|
event = bob.wait_for_event()
|
|
if event.kind == EventType.INFO and "Close/expunge succeeded." in event.msg:
|
|
break
|
|
if len(bob_direct_imap.get_all_messages()) == 0:
|
|
break
|
|
|
|
|
|
def test_imap_autodelete_fully_downloaded_msg(acfactory, tmp_path, direct_imap):
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
chat_alice = alice.create_chat(bob)
|
|
path = tmp_path / "large"
|
|
# Big enough to be sent with a pre-message
|
|
path.write_bytes(os.urandom(300000))
|
|
chat_alice.send_file(str(path))
|
|
|
|
msg = bob.wait_for_incoming_msg()
|
|
msg_snapshot = msg.get_snapshot()
|
|
assert msg_snapshot.download_state == DownloadState.AVAILABLE
|
|
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
|
assert msgs_changed_event.msg_id == msg.id
|
|
msg_snapshot = msg.get_snapshot()
|
|
assert msg_snapshot.download_state == DownloadState.DONE
|
|
|
|
bob_direct_imap = direct_imap(bob)
|
|
# Messages may be deleted separately
|
|
while True:
|
|
if len(bob_direct_imap.get_all_messages()) == 0:
|
|
break
|
|
bob.wait_for_event(EventType.IMAP_MESSAGE_DELETED)
|
|
while True:
|
|
event = bob.wait_for_event()
|
|
if event.kind == EventType.INFO and "Close/expunge succeeded." in event.msg:
|
|
break
|
|
|
|
|
|
def test_markseen_contact_request(acfactory):
|
|
"""
|
|
Test that seen status is synchronized for contact request messages
|
|
even though read receipt is not sent.
|
|
"""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
# Bob sets up a second device.
|
|
bob2 = bob.clone()
|
|
bob2.start_io()
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
alice_chat_bob.send_text("Hello Bob!")
|
|
|
|
message = bob.wait_for_incoming_msg()
|
|
message2 = bob2.wait_for_incoming_msg()
|
|
assert message2.get_snapshot().state == MessageState.IN_FRESH
|
|
|
|
message.mark_seen()
|
|
bob2.wait_for_event(EventType.MSGS_NOTICED)
|
|
assert message2.get_snapshot().state == MessageState.IN_SEEN
|
|
|
|
|
|
@pytest.mark.parametrize("team_profile", [True, False])
|
|
def test_no_markseen_in_team_profile(team_profile, acfactory):
|
|
"""
|
|
Test that seen status is synchronized iff `team_profile` isn't set.
|
|
"""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
if team_profile:
|
|
bob.set_config("team_profile", "1")
|
|
|
|
# Bob sets up a second device.
|
|
bob2 = bob.clone()
|
|
bob2.start_io()
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
bob_chat_alice = bob.create_chat(alice)
|
|
bob2.create_chat(alice)
|
|
alice_chat_bob.send_text("Hello Bob!")
|
|
|
|
message = bob.wait_for_incoming_msg()
|
|
message2 = bob2.wait_for_incoming_msg()
|
|
assert message2.get_snapshot().state == MessageState.IN_FRESH
|
|
|
|
message.mark_seen()
|
|
|
|
# Send a message and wait until it arrives
|
|
# in order to wait until Bob2 gets the markseen message.
|
|
# This also tests that outgoing messages
|
|
# don't mark preceeding messages as seen in team profiles.
|
|
bob_chat_alice.send_text("Outgoing message")
|
|
while True:
|
|
outgoing = bob2.wait_for_msg(EventType.MSGS_CHANGED)
|
|
if outgoing.id != 0:
|
|
break
|
|
assert outgoing.get_snapshot().text == "Outgoing message"
|
|
|
|
if team_profile:
|
|
assert message2.get_snapshot().state == MessageState.IN_FRESH
|
|
else:
|
|
assert message2.get_snapshot().state == MessageState.IN_SEEN
|
|
|
|
|
|
def test_read_receipt(acfactory):
|
|
"""
|
|
Test sending a read receipt and ensure it is attributed to the correct contact.
|
|
"""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
alice_contact_bob = alice.create_contact(bob)
|
|
bob.create_chat(alice) # Accept the chat
|
|
|
|
alice_chat_bob.send_text("Hello Bob!")
|
|
msg = bob.wait_for_incoming_msg()
|
|
msg.mark_seen()
|
|
|
|
read_msg = alice.wait_for_msg(EventType.MSG_READ)
|
|
read_receipts = read_msg.get_read_receipts()
|
|
assert len(read_receipts) == 1
|
|
assert read_receipts[0].contact_id == alice_contact_bob.id
|
|
|
|
read_receipt_cnt = read_msg.get_read_receipt_count()
|
|
assert read_receipt_cnt == 1
|
|
|
|
|
|
def test_get_http_response(acfactory):
|
|
alice = acfactory.new_configured_account()
|
|
http_response = alice._rpc.get_http_response(alice.id, "https://example.org")
|
|
assert http_response["mimetype"] == "text/html"
|
|
assert b"<title>Example Domain</title>" in base64.b64decode((http_response["blob"] + "==").encode())
|
|
|
|
|
|
def test_configured_imap_certificate_checks(acfactory):
|
|
alice = acfactory.new_configured_account()
|
|
|
|
# Certificate checks should be configured (not None)
|
|
assert "cert_strict" in alice.get_info().used_transport_settings
|
|
|
|
# "cert_old_automatic" is the value old Delta Chat core versions used
|
|
# to mean user entered "imap_certificate_checks=0" (Automatic)
|
|
# and configuration failed to use strict TLS checks
|
|
# so it switched strict TLS checks off.
|
|
#
|
|
# New versions of Delta Chat are not disabling TLS checks
|
|
# unless users explicitly disables them
|
|
# or provider database says provider has invalid certificates.
|
|
#
|
|
# Core 1.142.4, 1.142.5 and 1.142.6 saved this value due to bug.
|
|
# This test is a regression test to prevent this happening again.
|
|
assert "cert_old_automatic" not in alice.get_info().used_transport_settings
|
|
|
|
|
|
def test_no_old_msg_is_fresh(acfactory):
|
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
|
ac1_clone = ac1.clone()
|
|
ac1_clone.start_io()
|
|
|
|
ac1.create_chat(ac2)
|
|
ac1_clone_chat = ac1_clone.create_chat(ac2)
|
|
|
|
ac1.get_device_chat().mark_noticed()
|
|
|
|
logging.info("Send a first message from ac2 to ac1 and check that it's 'fresh'")
|
|
first_msg = ac2.create_chat(ac1).send_text("Hi")
|
|
ac1.wait_for_incoming_msg_event()
|
|
assert ac1.create_chat(ac2).get_fresh_message_count() == 1
|
|
assert len(list(ac1.get_fresh_messages())) == 1
|
|
|
|
ac1.wait_for_event(EventType.IMAP_INBOX_IDLE)
|
|
|
|
logging.info("Send a message from ac1_clone to ac2 and check that ac1 marks the first message as 'noticed'")
|
|
ac1_clone_chat.send_text("Hi back")
|
|
ev = ac1.wait_for_msgs_noticed_event()
|
|
|
|
assert ev.chat_id == first_msg.get_snapshot().chat_id
|
|
assert ac1.create_chat(ac2).get_fresh_message_count() == 0
|
|
assert len(list(ac1.get_fresh_messages())) == 0
|
|
|
|
|
|
def test_rename_synchronization(acfactory):
|
|
"""Test synchronization of contact renaming."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
alice2 = alice.clone()
|
|
alice2.bring_online()
|
|
|
|
bob.set_config("displayname", "Bob")
|
|
bob.create_chat(alice).send_text("Hello!")
|
|
alice_msg = alice.wait_for_incoming_msg().get_snapshot()
|
|
alice2_msg = alice2.wait_for_incoming_msg().get_snapshot()
|
|
|
|
assert alice2_msg.sender.get_snapshot().display_name == "Bob"
|
|
alice_msg.sender.set_name("Bobby")
|
|
alice2.wait_for_event(EventType.CONTACTS_CHANGED)
|
|
assert alice2_msg.sender.get_snapshot().display_name == "Bobby"
|
|
|
|
|
|
def test_rename_group(acfactory):
|
|
"""Test renaming the group."""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_group = alice.create_group("Test group")
|
|
alice_contact_bob = alice.create_contact(bob)
|
|
alice_group.add_contact(alice_contact_bob)
|
|
alice_group.send_text("Hello!")
|
|
|
|
bob_msg = bob.wait_for_incoming_msg()
|
|
bob_chat = bob_msg.get_snapshot().chat
|
|
assert bob_chat.get_basic_snapshot().name == "Test group"
|
|
bob.wait_for_event(EventType.CHATLIST_ITEM_CHANGED)
|
|
|
|
for name in ["Baz", "Foo bar", "Xyzzy"]:
|
|
alice_group.set_name(name)
|
|
bob.wait_for_event(EventType.CHATLIST_ITEM_CHANGED)
|
|
bob.wait_for_event(EventType.CHATLIST_ITEM_CHANGED)
|
|
assert bob_chat.get_basic_snapshot().name == name
|
|
|
|
|
|
def test_get_all_accounts_deadlock(rpc):
|
|
"""Regression test for get_all_accounts deadlock."""
|
|
for _ in range(100):
|
|
all_accounts = rpc.get_all_accounts.future()
|
|
rpc.add_account()
|
|
all_accounts()
|
|
|
|
|
|
@pytest.mark.parametrize("all_devices_online", [True, False])
|
|
def test_leave_broadcast(acfactory, all_devices_online):
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
bob2 = bob.clone()
|
|
|
|
if all_devices_online:
|
|
bob2.start_io()
|
|
|
|
logging.info("===================== Alice creates a broadcast =====================")
|
|
alice_chat = alice.create_broadcast("Broadcast channel!")
|
|
|
|
logging.info("===================== Bob joins the broadcast =====================")
|
|
qr_code = alice_chat.get_qr_code()
|
|
bob.secure_join(qr_code)
|
|
alice.wait_for_securejoin_inviter_success()
|
|
bob.wait_for_securejoin_joiner_success()
|
|
|
|
alice_bob_contact = alice.create_contact(bob)
|
|
alice_contacts = alice_chat.get_contacts()
|
|
assert len(alice_contacts) == 1 # 1 recipient
|
|
assert alice_contacts[0].id == alice_bob_contact.id
|
|
|
|
member_added_msg = bob.wait_for_incoming_msg()
|
|
assert member_added_msg.get_snapshot().text == "You joined the channel."
|
|
|
|
def get_broadcast(ac):
|
|
chat = ac.get_chatlist(query="Broadcast channel!")[0]
|
|
assert chat.get_basic_snapshot().name == "Broadcast channel!"
|
|
return chat
|
|
|
|
def check_account(ac, contact, inviter_side, please_wait_info_msg=False):
|
|
chat = get_broadcast(ac)
|
|
contact_snapshot = contact.get_snapshot()
|
|
chat_msgs = chat.get_messages()
|
|
|
|
encrypted_msg = chat_msgs.pop(0).get_snapshot()
|
|
assert encrypted_msg.text == "Messages are end-to-end encrypted."
|
|
assert encrypted_msg.is_info
|
|
|
|
if please_wait_info_msg:
|
|
first_msg = chat_msgs.pop(0).get_snapshot()
|
|
assert "invited you to join this channel" in first_msg.text
|
|
assert first_msg.is_info
|
|
|
|
member_added_msg = chat_msgs.pop(0).get_snapshot()
|
|
if inviter_side:
|
|
assert member_added_msg.text == f"Member {contact_snapshot.display_name} added."
|
|
else:
|
|
assert member_added_msg.text == "You joined the channel."
|
|
assert member_added_msg.is_info
|
|
|
|
if not inviter_side:
|
|
leave_msg = chat_msgs.pop(0).get_snapshot()
|
|
assert leave_msg.text == "You left the channel."
|
|
|
|
assert len(chat_msgs) == 0
|
|
|
|
chat_snapshot = chat.get_full_snapshot()
|
|
|
|
# On Alice's side, SELF is not in the list of contact ids
|
|
# because OutBroadcast chats never contain SELF in the list.
|
|
# On Bob's side, SELF is not in the list because he left.
|
|
if inviter_side:
|
|
assert len(chat_snapshot.contact_ids) == 0
|
|
else:
|
|
assert chat_snapshot.contact_ids == [contact.id]
|
|
|
|
logging.info("===================== Bob leaves the broadcast =====================")
|
|
bob_chat = get_broadcast(bob)
|
|
assert bob_chat.get_full_snapshot().self_in_group
|
|
assert len(bob_chat.get_contacts()) == 2 # Alice and Bob
|
|
|
|
bob_chat.leave()
|
|
assert not bob_chat.get_full_snapshot().self_in_group
|
|
# After Bob left, only Alice will be left in Bob's memberlist
|
|
assert len(bob_chat.get_contacts()) == 1
|
|
|
|
check_account(bob, bob.create_contact(alice), inviter_side=False, please_wait_info_msg=True)
|
|
|
|
logging.info("===================== Test Alice's device =====================")
|
|
while len(alice_chat.get_contacts()) != 0: # After Bob left, there will be 0 recipients
|
|
alice.wait_for_event(EventType.CHAT_MODIFIED)
|
|
|
|
check_account(alice, alice.create_contact(bob), inviter_side=True)
|
|
|
|
logging.info("===================== Test Bob's second device =====================")
|
|
# Start second Bob device, if it wasn't started already.
|
|
bob2.start_io()
|
|
|
|
member_added_msg = bob2.wait_for_incoming_msg()
|
|
assert member_added_msg.get_snapshot().text == "You joined the channel."
|
|
|
|
bob2_chat = get_broadcast(bob2)
|
|
|
|
# After Bob left, only Alice will be left in Bob's memberlist
|
|
while len(bob2_chat.get_contacts()) != 1:
|
|
bob2.wait_for_event(EventType.CHAT_MODIFIED)
|
|
|
|
check_account(bob2, bob2.create_contact(alice), inviter_side=False)
|
|
|
|
|
|
def test_leave_and_delete_group(acfactory, log):
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
log.section("Alice creates a group")
|
|
alice_chat = alice.create_group("Group")
|
|
alice_chat.add_contact(bob)
|
|
assert len(alice_chat.get_contacts()) == 2 # Alice and Bob
|
|
alice_chat.send_text("hello")
|
|
|
|
log.section("Bob sees the group, and leaves and deletes it")
|
|
msg = bob.wait_for_incoming_msg().get_snapshot()
|
|
assert msg.text == "hello"
|
|
msg.chat.accept()
|
|
|
|
msg.chat.leave()
|
|
# Bob deletes the chat. This must not prevent the leave message from being sent.
|
|
msg.chat.delete()
|
|
|
|
log.section("Alice receives the delete message")
|
|
# After Bob left, only Alice will be left in the group:
|
|
while len(alice_chat.get_contacts()) != 1:
|
|
alice.wait_for_event(EventType.CHAT_MODIFIED)
|
|
|
|
|
|
def test_immediate_autodelete(acfactory, direct_imap, log):
|
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
|
|
|
# "1" means delete immediately, while "0" means do not delete
|
|
ac2.set_config("delete_server_after", "1")
|
|
|
|
log.section("ac1: create chat with ac2")
|
|
chat1 = ac1.create_chat(ac2)
|
|
ac2.create_chat(ac1)
|
|
|
|
log.section("ac1: send message to ac2")
|
|
sent_msg = chat1.send_text("hello")
|
|
|
|
msg = ac2.wait_for_incoming_msg()
|
|
assert msg.get_snapshot().text == "hello"
|
|
|
|
log.section("ac2: wait for close/expunge on autodelete")
|
|
ac2.wait_for_event(EventType.IMAP_MESSAGE_DELETED)
|
|
while True:
|
|
event = ac2.wait_for_event()
|
|
if event.kind == EventType.INFO and "Close/expunge succeeded." in event.msg:
|
|
break
|
|
|
|
log.section("ac2: check that message was autodeleted on server")
|
|
ac2_direct_imap = direct_imap(ac2)
|
|
assert len(ac2_direct_imap.get_all_messages()) == 0
|
|
|
|
log.section("ac2: Mark deleted message as seen and check that read receipt arrives")
|
|
msg.mark_seen()
|
|
ev = ac1.wait_for_event(EventType.MSG_READ)
|
|
assert ev.chat_id == chat1.id
|
|
assert ev.msg_id == sent_msg.id
|
|
|
|
|
|
def test_background_fetch(acfactory, dc):
|
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
|
ac1.stop_io()
|
|
|
|
ac1_chat = ac1.create_chat(ac2)
|
|
|
|
ac2_chat = ac2.create_chat(ac1)
|
|
ac2_chat.send_text("Hello!")
|
|
|
|
while True:
|
|
dc.background_fetch(300)
|
|
messages = ac1_chat.get_messages()
|
|
snapshot = messages[-1].get_snapshot()
|
|
if snapshot.text == "Hello!":
|
|
break
|
|
|
|
# Stopping background fetch immediately after starting
|
|
# does not result in any errors.
|
|
background_fetch_future = dc.background_fetch.future(300)
|
|
dc.stop_background_fetch()
|
|
background_fetch_future()
|
|
|
|
# Starting background fetch with zero timeout is ok,
|
|
# it should terminate immediately.
|
|
dc.background_fetch(0)
|
|
|
|
# Background fetch can still be used to send and receive messages.
|
|
ac2_chat.send_text("Hello again!")
|
|
|
|
while True:
|
|
dc.background_fetch(300)
|
|
messages = ac1_chat.get_messages()
|
|
snapshot = messages[-1].get_snapshot()
|
|
if snapshot.text == "Hello again!":
|
|
break
|
|
|
|
|
|
def test_message_exists(acfactory):
|
|
ac1, ac2 = acfactory.get_online_accounts(2)
|
|
chat = ac1.create_chat(ac2)
|
|
message1 = chat.send_text("Hello!")
|
|
message2 = chat.send_text("Hello again!")
|
|
assert message1.exists()
|
|
assert message2.exists()
|
|
|
|
ac1.delete_messages([message1])
|
|
assert not message1.exists()
|
|
assert message2.exists()
|
|
|
|
# There is no error when checking if
|
|
# the message exists for deleted account.
|
|
ac1.remove()
|
|
assert not message1.exists()
|
|
assert not message2.exists()
|
|
|
|
|
|
def test_synchronize_member_list_on_group_rejoin(acfactory, log):
|
|
"""
|
|
Test that user recreates group member list when it joins the group again.
|
|
ac1 creates a group with two other accounts: ac2 and ac3
|
|
Then it removes ac2, removes ac3 and adds ac2 back.
|
|
ac2 did not see that ac3 is removed, so it should rebuild member list from scratch.
|
|
"""
|
|
log.section("setting up accounts, accepted with each other")
|
|
ac1, ac2, ac3 = accounts = acfactory.get_online_accounts(3)
|
|
|
|
log.section("ac1: creating group chat with 2 other members")
|
|
chat = ac1.create_group("title1")
|
|
chat.add_contact(ac2)
|
|
chat.add_contact(ac3)
|
|
|
|
log.section("ac1: send message to new group chat")
|
|
msg = chat.send_text("hello")
|
|
assert chat.num_contacts() == 3
|
|
|
|
log.section("checking that the chat arrived correctly")
|
|
for ac in accounts[1:]:
|
|
msg = ac.wait_for_incoming_msg().get_snapshot()
|
|
assert msg.text == "hello"
|
|
assert msg.chat.num_contacts() == 3
|
|
msg.chat.accept()
|
|
|
|
log.section("ac1: removing ac2")
|
|
chat.remove_contact(ac2)
|
|
|
|
log.section("ac2: wait for a message about removal from the chat")
|
|
ac2.wait_for_incoming_msg()
|
|
log.section("ac1: removing ac3")
|
|
chat.remove_contact(ac3)
|
|
|
|
log.section("ac1: adding ac2 back")
|
|
chat.add_contact(ac2)
|
|
|
|
log.section("ac2: check that ac3 is removed")
|
|
msg = ac2.wait_for_incoming_msg()
|
|
|
|
assert chat.num_contacts() == 2
|
|
assert msg.get_snapshot().chat.num_contacts() == 2
|
|
|
|
|
|
def test_large_message(acfactory) -> None:
|
|
"""
|
|
Test sending large message without download limit set,
|
|
so it is sent with pre-message but downloaded without user interaction.
|
|
"""
|
|
alice, bob = acfactory.get_online_accounts(2)
|
|
|
|
alice_chat_bob = alice.create_chat(bob)
|
|
alice_chat_bob.send_message(
|
|
"Hello World, this message is bigger than 5 bytes",
|
|
file="../test-data/image/screenshot.jpg",
|
|
)
|
|
|
|
msg = bob.wait_for_incoming_msg()
|
|
msgs_changed_event = bob.wait_for_msgs_changed_event()
|
|
assert msg.id == msgs_changed_event.msg_id
|
|
snapshot = msg.get_snapshot()
|
|
assert snapshot.text == "Hello World, this message is bigger than 5 bytes"
|