mirror of
https://github.com/chatmail/core.git
synced 2026-04-26 01:46:34 +03:00
Allow Delta Chat core to work with chatmail servers running on underscore-prefixed domains (e.g. _alice.localchat) which use self-signed TLS certificates. This is mirroring related work on chatmail relays: https://github.com/chatmail/relay/pull/855 Underscore domains with self-signed TLS certs can be used by LXC test containers where obtaining real certificates is not practical. When the domain starts with '_', certificate verification is automatically relaxed for IMAP/SMTP connections, dcaccount QR code handling, and iroh relay endpoints. The Python test suite is adapted to also work against such underscore-domain servers, including cross-core tests with older Delta Chat versions. Note: this PR does not support HTTPS requests with underscore domains. They are not currently needed for working with LXC test containers. 14 files changed, +102/-31 lines (excluding Cargo.lock). Cargo.lock: +606/-11 lines from enabling iroh features needed for connecting to iroh relay endpoint on underscore domains. The added dependencies are unfortunate but best considered when finally upgrading to iroh 1.0 (tm).
211 lines
7.0 KiB
Python
211 lines
7.0 KiB
Python
from __future__ import annotations
|
|
|
|
import imaplib
|
|
import io
|
|
import logging
|
|
import pathlib
|
|
import ssl
|
|
from contextlib import contextmanager
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
from imap_tools import AND, Header, MailBox, MailMessage, MailMessageFlags, errors
|
|
|
|
if TYPE_CHECKING:
|
|
from deltachat_rpc_client import Account
|
|
|
|
FLAGS = b"FLAGS"
|
|
FETCH = b"FETCH"
|
|
ALL = "1:*"
|
|
|
|
|
|
class DirectImap:
|
|
"""Internal Python-level IMAP handling."""
|
|
|
|
def __init__(self, account: Account) -> None:
|
|
self.account = account
|
|
self.logid = account.get_config("displayname") or id(account)
|
|
self._idling = False
|
|
self.connect()
|
|
|
|
def connect(self):
|
|
# Assume the testing server supports TLS on port 993.
|
|
host = self.account.get_config("configured_mail_server")
|
|
port = 993
|
|
|
|
user = self.account.get_config("addr")
|
|
host = user.rsplit("@")[-1]
|
|
pw = self.account.get_config("mail_pw")
|
|
|
|
ssl_context = ssl.create_default_context()
|
|
if host.startswith("_"):
|
|
ssl_context.check_hostname = False
|
|
ssl_context.verify_mode = ssl.CERT_NONE
|
|
self.conn = MailBox(host, port, ssl_context=ssl_context)
|
|
self.conn.login(user, pw)
|
|
|
|
self.select_folder("INBOX")
|
|
|
|
def shutdown(self):
|
|
try:
|
|
self.conn.logout()
|
|
except (OSError, imaplib.IMAP4.abort):
|
|
logging.warning("Could not logout direct_imap conn")
|
|
|
|
def create_folder(self, foldername):
|
|
try:
|
|
self.conn.folder.create(foldername)
|
|
except errors.MailboxFolderCreateError as e:
|
|
logging.warning(f"Cannot create '{foldername}', probably it already exists: {str(e)}")
|
|
|
|
def select_folder(self, foldername: str) -> tuple:
|
|
assert not self._idling
|
|
return self.conn.folder.set(foldername)
|
|
|
|
def select_config_folder(self, config_name: str):
|
|
"""Return info about selected folder if it is
|
|
configured, otherwise None.
|
|
"""
|
|
if "_" not in config_name:
|
|
config_name = f"configured_{config_name}_folder"
|
|
foldername = self.account.get_config(config_name)
|
|
if foldername:
|
|
return self.select_folder(foldername)
|
|
return None
|
|
|
|
def list_folders(self) -> list[str]:
|
|
"""return list of all existing folder names."""
|
|
assert not self._idling
|
|
return [folder.name for folder in self.conn.folder.list()]
|
|
|
|
def delete(self, uid_list: str, expunge=True):
|
|
"""delete a range of messages (imap-syntax).
|
|
If expunge is true, perform the expunge-operation
|
|
to make sure the messages are really gone and not
|
|
just flagged as deleted.
|
|
"""
|
|
self.conn.client.uid("STORE", uid_list, "+FLAGS", r"(\Deleted)")
|
|
if expunge:
|
|
self.conn.expunge()
|
|
|
|
def get_all_messages(self) -> list[MailMessage]:
|
|
assert not self._idling
|
|
return list(self.conn.fetch(mark_seen=False))
|
|
|
|
def get_unread_messages(self) -> list[str]:
|
|
assert not self._idling
|
|
return [msg.uid for msg in self.conn.fetch(AND(seen=False), mark_seen=False)]
|
|
|
|
def mark_all_read(self):
|
|
messages = self.get_unread_messages()
|
|
if messages:
|
|
res = self.conn.flag(messages, MailMessageFlags.SEEN, True)
|
|
logging.info(f"Marked seen: {messages} {res}")
|
|
|
|
def get_unread_cnt(self) -> int:
|
|
return len(self.get_unread_messages())
|
|
|
|
def dump_imap_structures(self, dir, logfile):
|
|
assert not self._idling
|
|
stream = io.StringIO()
|
|
|
|
def log(*args, **kwargs):
|
|
kwargs["file"] = stream
|
|
print(*args, **kwargs)
|
|
|
|
empty_folders = []
|
|
for imapfolder in self.list_folders():
|
|
self.select_folder(imapfolder)
|
|
messages = list(self.get_all_messages())
|
|
if not messages:
|
|
empty_folders.append(imapfolder)
|
|
continue
|
|
|
|
log("---------", imapfolder, len(messages), "messages ---------")
|
|
# get message content without auto-marking it as seen
|
|
# fetching 'RFC822' would mark it as seen.
|
|
for msg in self.conn.fetch(mark_seen=False):
|
|
body = getattr(msg.obj, "text", None)
|
|
if not body:
|
|
body = getattr(msg.obj, "html", None)
|
|
if not body:
|
|
log("Message", msg.uid, "has empty body")
|
|
continue
|
|
|
|
path = pathlib.Path(str(dir)).joinpath("IMAP", self.logid, imapfolder)
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
fn = path.joinpath(str(msg.uid))
|
|
fn.write_bytes(body)
|
|
log("Message", msg.uid, fn)
|
|
log(
|
|
"Message",
|
|
msg.uid,
|
|
msg.flags,
|
|
"Message-Id:",
|
|
msg.obj.get("Message-Id"),
|
|
)
|
|
|
|
if empty_folders:
|
|
log("--------- EMPTY FOLDERS:", empty_folders)
|
|
|
|
print(stream.getvalue(), file=logfile)
|
|
|
|
@contextmanager
|
|
def idle(self):
|
|
"""return Idle ContextManager."""
|
|
idle_manager = IdleManager(self)
|
|
try:
|
|
yield idle_manager
|
|
finally:
|
|
idle_manager.done()
|
|
|
|
def append(self, folder: str, msg: str):
|
|
"""Upload a message to *folder*.
|
|
Trailing whitespace or a linebreak at the beginning will be removed automatically.
|
|
"""
|
|
if msg.startswith("\n"):
|
|
msg = msg[1:]
|
|
msg = "\n".join([s.lstrip() for s in msg.splitlines()])
|
|
self.conn.append(bytes(msg, encoding="ascii"), folder)
|
|
|
|
def get_uid_by_message_id(self, message_id) -> str:
|
|
msgs = [msg.uid for msg in self.conn.fetch(AND(header=Header("MESSAGE-ID", message_id)))]
|
|
if len(msgs) == 0:
|
|
raise Exception("Did not find message " + message_id + ", maybe you forgot to select the correct folder?")
|
|
return msgs[0]
|
|
|
|
|
|
class IdleManager:
|
|
def __init__(self, direct_imap) -> None:
|
|
self.direct_imap = direct_imap
|
|
# fetch latest messages before starting idle so that it only
|
|
# returns messages that arrive anew
|
|
self.direct_imap.conn.fetch("1:*")
|
|
self.direct_imap.conn.idle.start()
|
|
|
|
def check(self, timeout=None) -> list[bytes]:
|
|
"""(blocking) wait for next idle message from server."""
|
|
return self.direct_imap.conn.idle.poll(timeout=timeout)
|
|
|
|
def wait_for_new_message(self) -> bytes:
|
|
while True:
|
|
for item in self.check():
|
|
if b"EXISTS" in item or b"RECENT" in item:
|
|
return item
|
|
|
|
def wait_for_seen(self, timeout=None) -> int:
|
|
"""Return first message with SEEN flag from a running idle-stream."""
|
|
while True:
|
|
for item in self.check(timeout=timeout):
|
|
if FETCH in item and FLAGS in item and rb"\Seen" in item:
|
|
return int(item.split(b" ")[1])
|
|
|
|
def done(self):
|
|
"""send idle-done to server if we are currently in idle mode."""
|
|
return self.direct_imap.conn.idle.stop()
|
|
|
|
|
|
@pytest.fixture
|
|
def direct_imap():
|
|
return DirectImap
|