mirror of
https://github.com/chatmail/core.git
synced 2026-04-05 15:02:11 +03:00
The motivation is to reduce code complexity, get rid of the extra IMAP connection and cases when messages are added to chats by Inbox and Sentbox loops in parallel which leads to various message sorting bugs, particularly to outgoing messages breaking sorting of incoming ones which are fetched later, but may have a smaller "Date".
724 lines
25 KiB
Python
724 lines
25 KiB
Python
import fnmatch
|
|
import io
|
|
import os
|
|
import pathlib
|
|
import queue
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import weakref
|
|
import random
|
|
from queue import Queue
|
|
from typing import Callable, Dict, List, Optional, Set
|
|
|
|
import pytest
|
|
from _pytest._code import Source
|
|
|
|
import deltachat
|
|
|
|
from . import Account, account_hookimpl, const, get_core_info
|
|
from .events import FFIEventLogger, FFIEventTracker
|
|
|
|
E2EE_INFO_MSGS = 1
|
|
"""
|
|
The number of info messages added to new e2ee chats.
|
|
Currently this is "End-to-end encryption available".
|
|
"""
|
|
|
|
|
|
def pytest_addoption(parser):
|
|
group = parser.getgroup("deltachat testplugin options")
|
|
group.addoption(
|
|
"--chatmail",
|
|
action="store",
|
|
default=None,
|
|
help="chatmail server domain name",
|
|
)
|
|
group.addoption(
|
|
"--ignored",
|
|
action="store_true",
|
|
help="Also run tests marked with the ignored marker",
|
|
)
|
|
group.addoption(
|
|
"--strict-tls",
|
|
action="store_true",
|
|
help="Never accept invalid TLS certificates for test accounts",
|
|
)
|
|
group.addoption(
|
|
"--extra-info",
|
|
action="store_true",
|
|
help="show more info on failures (imap server state, config)",
|
|
)
|
|
group.addoption(
|
|
"--debug-setup",
|
|
action="store_true",
|
|
help="show events during configure and start io phases of online accounts",
|
|
)
|
|
|
|
|
|
def pytest_configure(config):
|
|
cfg = config.getoption("--chatmail")
|
|
if not cfg:
|
|
cfg = os.getenv("CHATMAIL_DOMAIN")
|
|
if cfg:
|
|
config.option.chatmail = cfg
|
|
|
|
# Make sure we don't get garbled output because threads keep running
|
|
# collect all ever created accounts in a weakref-set (so we don't
|
|
# keep objects unnecessarily alive) and enable/disable logging
|
|
# for each pytest test phase # (setup/call/teardown).
|
|
# Additionally make the acfactory use a logging/no-logging default.
|
|
|
|
class LoggingAspect:
|
|
def __init__(self) -> None:
|
|
self._accounts: weakref.WeakSet = weakref.WeakSet()
|
|
|
|
@deltachat.global_hookimpl
|
|
def dc_account_init(self, account):
|
|
self._accounts.add(account)
|
|
|
|
def disable_logging(self, item):
|
|
for acc in self._accounts:
|
|
acc.disable_logging()
|
|
acfactory = item.funcargs.get("acfactory")
|
|
if acfactory:
|
|
acfactory.set_logging_default(False)
|
|
|
|
def enable_logging(self, item):
|
|
for acc in self._accounts:
|
|
acc.enable_logging()
|
|
acfactory = item.funcargs.get("acfactory")
|
|
if acfactory:
|
|
acfactory.set_logging_default(True)
|
|
|
|
@pytest.hookimpl(hookwrapper=True)
|
|
def pytest_runtest_setup(self, item):
|
|
if item.get_closest_marker("ignored"):
|
|
if not item.config.getvalue("ignored"):
|
|
pytest.skip("use --ignored to run this test")
|
|
self.enable_logging(item)
|
|
yield
|
|
self.disable_logging(item)
|
|
|
|
@pytest.hookimpl(hookwrapper=True)
|
|
def pytest_pyfunc_call(self, pyfuncitem):
|
|
self.enable_logging(pyfuncitem)
|
|
yield
|
|
self.disable_logging(pyfuncitem)
|
|
|
|
@pytest.hookimpl(hookwrapper=True)
|
|
def pytest_runtest_teardown(self, item):
|
|
logging = item.config.getoption("--extra-info")
|
|
if logging:
|
|
self.enable_logging(item)
|
|
yield
|
|
if logging:
|
|
self.disable_logging(item)
|
|
|
|
la = LoggingAspect()
|
|
config.pluginmanager.register(la)
|
|
deltachat.register_global_plugin(la)
|
|
|
|
|
|
def pytest_report_header(config):
|
|
info = get_core_info()
|
|
summary = [
|
|
"Deltachat core={} sqlite={} journal_mode={}".format(
|
|
info["deltachat_core_version"],
|
|
info["sqlite_version"],
|
|
info["journal_mode"],
|
|
),
|
|
]
|
|
|
|
chatmail_opt = config.getoption("--chatmail")
|
|
if chatmail_opt:
|
|
summary.append(f"Chatmail account provider: {chatmail_opt}")
|
|
|
|
return summary
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def testprocess(request):
|
|
"""Return live account configuration manager.
|
|
|
|
The returned object is a :class:`TestProcess` object."""
|
|
return TestProcess(pytestconfig=request.config)
|
|
|
|
|
|
class TestProcess:
|
|
"""A pytest session-scoped instance to help with managing "live" account configurations."""
|
|
|
|
_addr2files: Dict[str, Dict[pathlib.Path, bytes]]
|
|
|
|
def __init__(self, pytestconfig) -> None:
|
|
self.pytestconfig = pytestconfig
|
|
self._addr2files = {}
|
|
self._configlist: List[Dict[str, str]] = []
|
|
|
|
def get_liveconfig_producer(self):
|
|
"""provide live account configs, cached on a per-test-process scope
|
|
so that test functions can reuse already known live configs.
|
|
"""
|
|
chatmail_opt = self.pytestconfig.getoption("--chatmail")
|
|
if chatmail_opt:
|
|
# Use a chatmail instance.
|
|
domain = chatmail_opt
|
|
MAX_LIVE_CREATED_ACCOUNTS = 10
|
|
for index in range(MAX_LIVE_CREATED_ACCOUNTS):
|
|
try:
|
|
yield self._configlist[index]
|
|
except IndexError:
|
|
part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
|
|
username = f"ci-{part}"
|
|
password = f"{username}${username}"
|
|
addr = f"{username}@{domain}"
|
|
config = {"addr": addr, "mail_pw": password}
|
|
print("newtmpuser {}: addr={}".format(index, config["addr"]))
|
|
self._configlist.append(config)
|
|
yield config
|
|
pytest.fail(f"more than {MAX_LIVE_CREATED_ACCOUNTS} live accounts requested.")
|
|
else:
|
|
pytest.skip(
|
|
"specify CHATMAIL_DOMAIN or --chatmail to provide live accounts",
|
|
)
|
|
|
|
def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
|
|
db_target_path = pathlib.Path(db_target_path)
|
|
assert not db_target_path.exists()
|
|
|
|
try:
|
|
filescache = self._addr2files[cache_addr]
|
|
except KeyError:
|
|
print("CACHE FAIL for", cache_addr)
|
|
return False
|
|
else:
|
|
print("CACHE HIT for", cache_addr)
|
|
targetdir = db_target_path.parent
|
|
write_dict_to_dir(filescache, targetdir)
|
|
return True
|
|
|
|
def cache_maybe_store_configured_db_files(self, acc):
|
|
addr = acc.get_config("addr")
|
|
assert acc.is_configured()
|
|
# don't overwrite existing entries
|
|
if addr not in self._addr2files:
|
|
print("storing cache for", addr)
|
|
basedir = pathlib.Path(acc.get_blobdir()).parent
|
|
self._addr2files[addr] = create_dict_from_files_in_path(basedir)
|
|
return True
|
|
|
|
|
|
def create_dict_from_files_in_path(base):
|
|
cachedict = {}
|
|
for path in base.glob("**/*"):
|
|
if path.is_file():
|
|
cachedict[path.relative_to(base)] = path.read_bytes()
|
|
return cachedict
|
|
|
|
|
|
def write_dict_to_dir(dic, target_dir):
|
|
assert dic
|
|
for relpath, content in dic.items():
|
|
path = target_dir.joinpath(relpath)
|
|
if not path.parent.exists():
|
|
os.makedirs(path.parent)
|
|
path.write_bytes(content)
|
|
|
|
|
|
@pytest.fixture()
|
|
def data(request):
|
|
"""Test data."""
|
|
|
|
class Data:
|
|
def __init__(self) -> None:
|
|
# trying to find test data heuristically
|
|
# because we are run from a dev-setup with pytest direct,
|
|
# through tox, and then maybe also from deltachat-binding
|
|
# users like "deltabot".
|
|
self.paths = [
|
|
os.path.normpath(x)
|
|
for x in [
|
|
os.path.join(os.path.dirname(request.fspath.strpath), "data"),
|
|
os.path.join(os.path.dirname(request.fspath.strpath), "..", "..", "test-data"),
|
|
os.path.join(os.path.dirname(__file__), "..", "..", "..", "test-data"),
|
|
]
|
|
]
|
|
|
|
def get_path(self, bn):
|
|
"""return path of file or None if it doesn't exist."""
|
|
for path in self.paths:
|
|
fn = os.path.join(path, *bn.split("/"))
|
|
if os.path.exists(fn):
|
|
return fn
|
|
print(f"WARNING: path does not exist: {fn!r}")
|
|
return None
|
|
|
|
def read_path(self, bn, mode="r"):
|
|
fn = self.get_path(bn)
|
|
if fn is not None:
|
|
with open(fn, mode) as f:
|
|
return f.read()
|
|
|
|
return Data()
|
|
|
|
|
|
class ACSetup:
|
|
"""
|
|
Accounts setup helper to deal with multiple configure-process
|
|
and io & imap initialization phases.
|
|
|
|
From tests, use the higher level
|
|
public ACFactory methods instead of its private helper class.
|
|
"""
|
|
|
|
CONFIGURING = "CONFIGURING"
|
|
CONFIGURED = "CONFIGURED"
|
|
IDLEREADY = "IDLEREADY"
|
|
|
|
_configured_events: Queue
|
|
|
|
def __init__(self, testprocess, init_time) -> None:
|
|
self._configured_events = Queue()
|
|
self._account2state: Dict[Account, str] = {}
|
|
self._account2config: Dict[Account, Dict[str, str]] = {}
|
|
self._imap_cleaned: Set[str] = set()
|
|
self.testprocess = testprocess
|
|
self.init_time = init_time
|
|
|
|
def log(self, *args):
|
|
print("[acsetup]", f"{time.time() - self.init_time:.3f}", *args)
|
|
|
|
def add_configured(self, account):
|
|
"""add an already configured account."""
|
|
assert account.is_configured()
|
|
self._account2state[account] = self.CONFIGURED
|
|
self.log("added already configured account", account, account.get_config("addr"))
|
|
|
|
def start_configure(self, account):
|
|
"""add an account and start its configure process."""
|
|
|
|
class PendingTracker:
|
|
@account_hookimpl
|
|
def ac_configure_completed(this, success: bool, comment: Optional[str]) -> None:
|
|
self._configured_events.put((account, success, comment))
|
|
|
|
account.add_account_plugin(PendingTracker(), name="pending_tracker")
|
|
self._account2state[account] = self.CONFIGURING
|
|
account.configure()
|
|
self.log("started configure on", account)
|
|
|
|
def wait_one_configured(self, account):
|
|
"""wait until this account has successfully configured."""
|
|
if self._account2state[account] == self.CONFIGURING:
|
|
while True:
|
|
acc = self._pop_config_success()
|
|
if acc == account:
|
|
break
|
|
self.init_imap(acc)
|
|
self.init_logging(acc)
|
|
acc._evtracker.consume_events()
|
|
|
|
def bring_online(self):
|
|
"""Wait for all accounts to become ready to receive messages.
|
|
|
|
This will initialize logging, start IO and the direct_imap attribute
|
|
for each account which either is CONFIGURED already or which is CONFIGURING
|
|
and successfully completing the configuration process.
|
|
"""
|
|
print("wait_all_configured finds accounts=", self._account2state)
|
|
for acc, state in self._account2state.items():
|
|
if state == self.CONFIGURED:
|
|
self._onconfigure_start_io(acc)
|
|
self._account2state[acc] = self.IDLEREADY
|
|
|
|
while self.CONFIGURING in self._account2state.values():
|
|
acc = self._pop_config_success()
|
|
self._onconfigure_start_io(acc)
|
|
self._account2state[acc] = self.IDLEREADY
|
|
print("finished, account2state", self._account2state)
|
|
|
|
def _pop_config_success(self):
|
|
acc, success, comment = self._configured_events.get()
|
|
if not success:
|
|
pytest.fail(f"configuring online account {acc} failed: {comment}")
|
|
self._account2state[acc] = self.CONFIGURED
|
|
if acc in self._account2config:
|
|
acc.update_config(self._account2config[acc])
|
|
return acc
|
|
|
|
def _onconfigure_start_io(self, acc):
|
|
self.init_imap(acc)
|
|
self.init_logging(acc)
|
|
acc.start_io()
|
|
print(acc._logid, "waiting for inbox IDLE to become ready")
|
|
acc._evtracker.wait_idle_inbox_ready()
|
|
acc._evtracker.consume_events()
|
|
acc.log("inbox IDLE ready")
|
|
|
|
def init_logging(self, acc):
|
|
"""idempotent function for initializing logging (will replace existing logger)."""
|
|
logger = FFIEventLogger(acc, logid=acc._logid, init_time=self.init_time)
|
|
acc.add_account_plugin(logger, name="logger-" + acc._logid)
|
|
|
|
def init_imap(self, acc):
|
|
"""initialize direct_imap and cleanup server state."""
|
|
from deltachat.direct_imap import DirectImap
|
|
|
|
assert acc.is_configured()
|
|
if not hasattr(acc, "direct_imap"):
|
|
acc.direct_imap = DirectImap(acc)
|
|
addr = acc.get_config("addr")
|
|
if addr not in self._imap_cleaned:
|
|
imap = acc.direct_imap
|
|
for folder in imap.list_folders():
|
|
if folder.lower() == "inbox" or folder.lower() == "deltachat":
|
|
assert imap.select_folder(folder)
|
|
imap.delete("1:*", expunge=True)
|
|
else:
|
|
imap.conn.folder.delete(folder)
|
|
acc.log(f"imap cleaned for addr {addr}")
|
|
self._imap_cleaned.add(addr)
|
|
|
|
|
|
class ACFactory:
|
|
"""Account factory"""
|
|
|
|
init_time: float
|
|
_finalizers: List[Callable[[], None]]
|
|
_accounts: List[Account]
|
|
_acsetup: ACSetup
|
|
_preconfigured_keys: List[str]
|
|
|
|
def __init__(self, request, testprocess, tmpdir, data) -> None:
|
|
self.init_time = time.time()
|
|
self.tmpdir = tmpdir
|
|
self.pytestconfig = request.config
|
|
self.data = data
|
|
self.testprocess = testprocess
|
|
self._liveconfig_producer = testprocess.get_liveconfig_producer()
|
|
|
|
self._finalizers = []
|
|
self._accounts = []
|
|
self._acsetup = ACSetup(testprocess, self.init_time)
|
|
self._preconfigured_keys = ["alice", "bob", "charlie", "dom", "elena", "fiona"]
|
|
self.set_logging_default(False)
|
|
request.addfinalizer(self.finalize)
|
|
|
|
def log(self, *args):
|
|
print("[acfactory]", f"{time.time() - self.init_time:.3f}", *args)
|
|
|
|
def finalize(self):
|
|
while self._finalizers:
|
|
fin = self._finalizers.pop()
|
|
fin()
|
|
|
|
while self._accounts:
|
|
acc = self._accounts.pop()
|
|
if acc is not None:
|
|
imap = getattr(acc, "direct_imap", None)
|
|
if imap is not None:
|
|
imap.shutdown()
|
|
del acc.direct_imap
|
|
acc.shutdown()
|
|
acc.disable_logging()
|
|
|
|
def get_next_liveconfig(self):
|
|
"""
|
|
Base function to get functional online configurations
|
|
where we can make valid SMTP and IMAP connections with.
|
|
"""
|
|
configdict = next(self._liveconfig_producer).copy()
|
|
|
|
if self.pytestconfig.getoption("--strict-tls"):
|
|
# Enable strict certificate checks for online accounts
|
|
configdict["imap_certificate_checks"] = str(const.DC_CERTCK_STRICT)
|
|
configdict["smtp_certificate_checks"] = str(const.DC_CERTCK_STRICT)
|
|
|
|
assert "addr" in configdict and "mail_pw" in configdict
|
|
return configdict
|
|
|
|
def _get_cached_account(self, addr) -> Optional[Account]:
|
|
if addr in self.testprocess._addr2files:
|
|
return self._getaccount(addr)
|
|
return None
|
|
|
|
def get_unconfigured_account(self, closed=False) -> Account:
|
|
return self._getaccount(closed=closed)
|
|
|
|
def _getaccount(self, try_cache_addr=None, closed=False) -> Account:
|
|
logid = f"ac{len(self._accounts) + 1}"
|
|
# we need to use fixed database basename for maybe_cache_* functions to work
|
|
path = self.tmpdir.mkdir(logid).join("dc.db")
|
|
if try_cache_addr:
|
|
self.testprocess.cache_maybe_retrieve_configured_db_files(try_cache_addr, path)
|
|
ac = Account(path.strpath, logging=self._logging, closed=closed)
|
|
ac._logid = logid # later instantiated FFIEventLogger needs this
|
|
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
|
|
if self.pytestconfig.getoption("--debug-setup"):
|
|
self._acsetup.init_logging(ac)
|
|
self._accounts.append(ac)
|
|
return ac
|
|
|
|
def set_logging_default(self, logging) -> None:
|
|
self._logging = bool(logging)
|
|
|
|
def remove_preconfigured_keys(self) -> None:
|
|
self._preconfigured_keys = []
|
|
|
|
def _preconfigure_key(self, account):
|
|
# Only set a preconfigured key if we haven't used it yet for another account.
|
|
try:
|
|
keyname = self._preconfigured_keys.pop(0)
|
|
except IndexError:
|
|
pass
|
|
else:
|
|
fname_sec = self.data.read_path(f"key/{keyname}-secret.asc")
|
|
if fname_sec:
|
|
account._preconfigure_keypair(fname_sec)
|
|
return True
|
|
print("WARN: could not use preconfigured keys")
|
|
|
|
def get_pseudo_configured_account(self, passphrase: Optional[str] = None) -> Account:
|
|
# do a pseudo-configured account
|
|
ac = self.get_unconfigured_account(closed=bool(passphrase))
|
|
if passphrase:
|
|
ac.open(passphrase)
|
|
acname = ac._logid
|
|
addr = f"{acname}@offline.org"
|
|
ac.update_config(
|
|
{
|
|
"configured_addr": addr,
|
|
"displayname": acname,
|
|
},
|
|
)
|
|
self._preconfigure_key(ac)
|
|
self._acsetup.init_logging(ac)
|
|
return ac
|
|
|
|
def new_online_configuring_account(self, cloned_from=None, cache=False, **kwargs) -> Account:
|
|
if cloned_from is None:
|
|
configdict = self.get_next_liveconfig()
|
|
else:
|
|
# XXX we might want to transfer the key to the new account
|
|
configdict = {
|
|
"addr": cloned_from.get_config("addr"),
|
|
"mail_pw": cloned_from.get_config("mail_pw"),
|
|
"imap_certificate_checks": cloned_from.get_config("imap_certificate_checks"),
|
|
"smtp_certificate_checks": cloned_from.get_config("smtp_certificate_checks"),
|
|
}
|
|
configdict.update(kwargs)
|
|
ac = self._get_cached_account(addr=configdict["addr"]) if cache else None
|
|
if ac is not None:
|
|
# make sure we consume a preconfig key, as if we had created a fresh account
|
|
self._preconfigured_keys.pop(0)
|
|
self._acsetup.add_configured(ac)
|
|
return ac
|
|
ac = self.prepare_account_from_liveconfig(configdict)
|
|
self._acsetup.start_configure(ac)
|
|
return ac
|
|
|
|
def prepare_account_from_liveconfig(self, configdict) -> Account:
|
|
ac = self.get_unconfigured_account()
|
|
assert "addr" in configdict and "mail_pw" in configdict, configdict
|
|
configdict.setdefault("bcc_self", False)
|
|
configdict.setdefault("mvbox_move", False)
|
|
configdict.setdefault("sync_msgs", False)
|
|
configdict.setdefault("delete_server_after", 0)
|
|
ac.update_config(configdict)
|
|
self._acsetup._account2config[ac] = configdict
|
|
self._preconfigure_key(ac)
|
|
return ac
|
|
|
|
def wait_configured(self, account) -> None:
|
|
"""Wait until the specified account has successfully completed configure."""
|
|
self._acsetup.wait_one_configured(account)
|
|
|
|
def bring_accounts_online(self) -> None:
|
|
print("bringing accounts online")
|
|
self._acsetup.bring_online()
|
|
print("all accounts online")
|
|
|
|
def get_online_accounts(self, num):
|
|
accounts = [self.new_online_configuring_account(cache=True) for i in range(num)]
|
|
self.bring_accounts_online()
|
|
# we cache fully configured and started accounts
|
|
for acc in accounts:
|
|
self.testprocess.cache_maybe_store_configured_db_files(acc)
|
|
return accounts
|
|
|
|
def run_bot_process(self, module, ffi=True):
|
|
fn = module.__file__
|
|
|
|
bot_cfg = self.get_next_liveconfig()
|
|
bot_ac = self.prepare_account_from_liveconfig(bot_cfg)
|
|
self._acsetup.start_configure(bot_ac)
|
|
self.wait_configured(bot_ac)
|
|
bot_ac.start_io()
|
|
# Wait for DC_EVENT_IMAP_INBOX_IDLE so that all emails appeared in the bot's Inbox later are
|
|
# considered new and not existing ones, and thus processed by the bot.
|
|
print(bot_ac._logid, "waiting for inbox IDLE to become ready")
|
|
bot_ac._evtracker.wait_idle_inbox_ready()
|
|
bot_ac.stop_io()
|
|
self._acsetup._account2state[bot_ac] = self._acsetup.IDLEREADY
|
|
|
|
# Forget ac as it will be opened by the bot subprocess
|
|
# but keep something in the list to not confuse account generation
|
|
self._accounts[self._accounts.index(bot_ac)] = None
|
|
|
|
args = [
|
|
sys.executable,
|
|
"-u",
|
|
fn,
|
|
"--email",
|
|
bot_cfg["addr"],
|
|
"--password",
|
|
bot_cfg["mail_pw"],
|
|
bot_ac.db_path,
|
|
]
|
|
if ffi:
|
|
args.insert(-1, "--show-ffi")
|
|
print("$", " ".join(args))
|
|
popen = subprocess.Popen(
|
|
args=args,
|
|
stdin=subprocess.DEVNULL,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, # combine stdout/stderr in one stream
|
|
bufsize=0, # line buffering
|
|
close_fds=True, # close all FDs other than 0/1/2
|
|
universal_newlines=True, # give back text
|
|
)
|
|
bot = BotProcess(popen, addr=bot_cfg["addr"])
|
|
self._finalizers.append(bot.kill)
|
|
return bot
|
|
|
|
def dump_imap_summary(self, logfile):
|
|
for ac in self._accounts:
|
|
ac.dump_account_info(logfile=logfile)
|
|
imap = getattr(ac, "direct_imap", None)
|
|
if imap is not None:
|
|
imap.dump_imap_structures(self.tmpdir, logfile=logfile)
|
|
|
|
def get_accepted_chat(self, ac1: Account, ac2: Account):
|
|
ac2.create_chat(ac1)
|
|
return ac1.create_chat(ac2)
|
|
|
|
def introduce_each_other(self, accounts, sending=True):
|
|
to_wait = []
|
|
for i, acc in enumerate(accounts):
|
|
for acc2 in accounts[i + 1 :]:
|
|
chat = self.get_accepted_chat(acc, acc2)
|
|
if sending:
|
|
chat.send_text("hi")
|
|
to_wait.append(acc2)
|
|
acc2.create_chat(acc).send_text("hi back")
|
|
to_wait.append(acc)
|
|
for acc in to_wait:
|
|
acc.log("waiting for incoming message")
|
|
acc._evtracker.wait_next_incoming_message()
|
|
|
|
|
|
@pytest.fixture()
|
|
def acfactory(request, tmpdir, testprocess, data):
|
|
"""Account factory."""
|
|
am = ACFactory(request=request, tmpdir=tmpdir, testprocess=testprocess, data=data)
|
|
yield am
|
|
if hasattr(request.node, "rep_call") and request.node.rep_call.failed:
|
|
if testprocess.pytestconfig.getoption("--extra-info"):
|
|
logfile = io.StringIO()
|
|
am.dump_imap_summary(logfile=logfile)
|
|
print(logfile.getvalue())
|
|
# request.node.add_report_section("call", "imap-server-state", s)
|
|
|
|
|
|
class BotProcess:
|
|
stdout_queue: queue.Queue
|
|
|
|
def __init__(self, popen, addr) -> None:
|
|
self.popen = popen
|
|
|
|
# The first thing the bot prints to stdout is an invite link.
|
|
self.qr = self.popen.stdout.readline()
|
|
self.addr = addr
|
|
|
|
# we read stdout as quickly as we can in a thread and make
|
|
# the (unicode) lines available for readers through a queue.
|
|
self.stdout_queue = queue.Queue()
|
|
self.stdout_thread = t = threading.Thread(target=self._run_stdout_thread, name="bot-stdout-thread")
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
def _run_stdout_thread(self) -> None:
|
|
try:
|
|
while True:
|
|
line = self.popen.stdout.readline()
|
|
if not line:
|
|
break
|
|
line = line.strip()
|
|
self.stdout_queue.put(line)
|
|
print("bot-stdout: ", line)
|
|
finally:
|
|
self.stdout_queue.put(None)
|
|
|
|
def kill(self) -> None:
|
|
self.popen.kill()
|
|
|
|
def wait(self, timeout=None) -> None:
|
|
self.popen.wait(timeout=timeout)
|
|
|
|
def fnmatch_lines(self, pattern_lines):
|
|
patterns = [x.strip() for x in Source(pattern_lines.rstrip()).lines if x.strip()]
|
|
for next_pattern in patterns:
|
|
print("+++FNMATCH:", next_pattern)
|
|
ignored = []
|
|
while True:
|
|
line = self.stdout_queue.get()
|
|
if line is None:
|
|
if ignored:
|
|
print("BOT stdout terminated after these lines")
|
|
for line in ignored:
|
|
print(line)
|
|
raise IOError("BOT stdout-thread terminated")
|
|
if fnmatch.fnmatch(line, next_pattern):
|
|
print("+++MATCHED:", line)
|
|
break
|
|
else:
|
|
print("+++IGN:", line)
|
|
ignored.append(line)
|
|
|
|
|
|
@pytest.fixture()
|
|
def tmp_db_path(tmpdir):
|
|
"""Return a path inside the temporary directory where the database can be created."""
|
|
return tmpdir.join("test.db").strpath
|
|
|
|
|
|
@pytest.fixture()
|
|
def lp():
|
|
"""Log printer fixture."""
|
|
|
|
class Printer:
|
|
def sec(self, msg: str) -> None:
|
|
print()
|
|
print("=" * 10, msg, "=" * 10)
|
|
|
|
def step(self, msg: str) -> None:
|
|
print("-" * 5, "step " + msg, "-" * 5)
|
|
|
|
def indent(self, msg: str) -> None:
|
|
print(" " + msg)
|
|
|
|
return Printer()
|
|
|
|
|
|
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
|
def pytest_runtest_makereport(item, call):
|
|
# execute all other hooks to obtain the report object
|
|
outcome = yield
|
|
rep = outcome.get_result()
|
|
|
|
# set a report attribute for each phase of a call, which can
|
|
# be "setup", "call", "teardown"
|
|
|
|
setattr(item, "rep_" + rep.when, rep)
|