move io thread handling into own module

This commit is contained in:
holger krekel
2020-02-23 17:22:27 +01:00
parent 5c8f558f60
commit 0d4b6f5627
3 changed files with 103 additions and 90 deletions

View File

@@ -2,10 +2,9 @@
from __future__ import print_function from __future__ import print_function
import atexit import atexit
import threading
from contextlib import contextmanager from contextlib import contextmanager
from threading import Event
import os import os
import time
from array import array from array import array
import deltachat import deltachat
from . import const from . import const
@@ -15,7 +14,7 @@ from .chat import Chat
from .message import Message from .message import Message
from .contact import Contact from .contact import Contact
from .tracker import ImexTracker from .tracker import ImexTracker
from . import hookspec from . import hookspec, iothreads
class MissingCredentials(ValueError): class MissingCredentials(ValueError):
@@ -48,7 +47,8 @@ class Account(object):
hook = hookspec.Global._get_plugin_manager().hook hook = hookspec.Global._get_plugin_manager().hook
hook.at_account_init(account=self, db_path=db_path) hook.at_account_init(account=self, db_path=db_path)
self._threads = IOThreads(self) self._shutdown_event = Event()
self._threads = iothreads.IOThreads(self)
# send all FFI events for this account to a plugin hook # send all FFI events for this account to a plugin hook
def _ll_event(ctx, evt_name, data1, data2): def _ll_event(ctx, evt_name, data1, data2):
@@ -559,6 +559,14 @@ class Account(object):
lib.dc_configure(self._dc_context) lib.dc_configure(self._dc_context)
self._threads.start() self._threads.start()
@hookspec.account_hookimpl
def after_shutdown(self):
self._shutdown_event.set()
def wait_shutdown(self):
""" wait until shutdown of this account has completed. """
self._shutdown_event.wait()
def shutdown(self, wait=True): def shutdown(self, wait=True):
""" shutdown account, stop threads and close and remove """ shutdown account, stop threads and close and remove
underlying dc_context and callbacks. """ underlying dc_context and callbacks. """
@@ -574,89 +582,6 @@ class Account(object):
self._pm.hook.after_shutdown() self._pm.hook.after_shutdown()
class IOThreads:
def __init__(self, account):
self.account = account
self._dc_context = account._dc_context
self._thread_quitflag = False
self._name2thread = {}
def is_started(self):
return len(self._name2thread) > 0
def start(self):
assert not self.is_started()
self._start_one_thread("inbox", self.imap_thread_run)
self._start_one_thread("smtp", self.smtp_thread_run)
if int(self.account.get_config("mvbox_watch")):
self._start_one_thread("mvbox", self.mvbox_thread_run)
if int(self.account.get_config("sentbox_watch")):
self._start_one_thread("sentbox", self.sentbox_thread_run)
def _start_one_thread(self, name, func):
self._name2thread[name] = t = threading.Thread(target=func, name=name)
t.setDaemon(1)
t.start()
@contextmanager
def log_execution(self, message):
self.account.log_line(message + " START")
yield
self.account.log_line(message + " FINISHED")
def stop(self, wait=False):
self._thread_quitflag = True
# Workaround for a race condition. Make sure that thread is
# not in between checking for quitflag and entering idle.
time.sleep(0.5)
lib.dc_interrupt_imap_idle(self._dc_context)
lib.dc_interrupt_smtp_idle(self._dc_context)
if "mvbox" in self._name2thread:
lib.dc_interrupt_mvbox_idle(self._dc_context)
if "sentbox" in self._name2thread:
lib.dc_interrupt_sentbox_idle(self._dc_context)
if wait:
for name, thread in self._name2thread.items():
thread.join()
def imap_thread_run(self):
with self.log_execution("INBOX THREAD START"):
while not self._thread_quitflag:
lib.dc_perform_imap_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_idle(self._dc_context)
def mvbox_thread_run(self):
with self.log_execution("MVBOX THREAD"):
while not self._thread_quitflag:
lib.dc_perform_mvbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_idle(self._dc_context)
def sentbox_thread_run(self):
with self.log_execution("SENTBOX THREAD"):
while not self._thread_quitflag:
lib.dc_perform_sentbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_idle(self._dc_context)
def smtp_thread_run(self):
with self.log_execution("SMTP THREAD"):
while not self._thread_quitflag:
lib.dc_perform_smtp_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_smtp_idle(self._dc_context)
def _destroy_dc_context(dc_context, dc_context_unref=lib.dc_context_unref): def _destroy_dc_context(dc_context, dc_context_unref=lib.dc_context_unref):
# destructor for dc_context # destructor for dc_context
dc_context_unref(dc_context) dc_context_unref(dc_context)

View File

@@ -0,0 +1,90 @@
import threading
import time
from contextlib import contextmanager
from .capi import lib
class IOThreads:
def __init__(self, account):
self.account = account
self._dc_context = account._dc_context
self._thread_quitflag = False
self._name2thread = {}
def is_started(self):
return len(self._name2thread) > 0
def start(self):
assert not self.is_started()
self._start_one_thread("inbox", self.imap_thread_run)
self._start_one_thread("smtp", self.smtp_thread_run)
if int(self.account.get_config("mvbox_watch")):
self._start_one_thread("mvbox", self.mvbox_thread_run)
if int(self.account.get_config("sentbox_watch")):
self._start_one_thread("sentbox", self.sentbox_thread_run)
def _start_one_thread(self, name, func):
self._name2thread[name] = t = threading.Thread(target=func, name=name)
t.setDaemon(1)
t.start()
@contextmanager
def log_execution(self, message):
self.account.log_line(message + " START")
yield
self.account.log_line(message + " FINISHED")
def stop(self, wait=False):
self._thread_quitflag = True
# Workaround for a race condition. Make sure that thread is
# not in between checking for quitflag and entering idle.
time.sleep(0.5)
lib.dc_interrupt_imap_idle(self._dc_context)
lib.dc_interrupt_smtp_idle(self._dc_context)
if "mvbox" in self._name2thread:
lib.dc_interrupt_mvbox_idle(self._dc_context)
if "sentbox" in self._name2thread:
lib.dc_interrupt_sentbox_idle(self._dc_context)
if wait:
for name, thread in self._name2thread.items():
thread.join()
def imap_thread_run(self):
with self.log_execution("INBOX THREAD START"):
while not self._thread_quitflag:
lib.dc_perform_imap_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_imap_idle(self._dc_context)
def mvbox_thread_run(self):
with self.log_execution("MVBOX THREAD"):
while not self._thread_quitflag:
lib.dc_perform_mvbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_mvbox_idle(self._dc_context)
def sentbox_thread_run(self):
with self.log_execution("SENTBOX THREAD"):
while not self._thread_quitflag:
lib.dc_perform_sentbox_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_fetch(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_sentbox_idle(self._dc_context)
def smtp_thread_run(self):
with self.log_execution("SMTP THREAD"):
while not self._thread_quitflag:
lib.dc_perform_smtp_jobs(self._dc_context)
if not self._thread_quitflag:
lib.dc_perform_smtp_idle(self._dc_context)

View File

@@ -355,13 +355,11 @@ class TestOfflineChat:
contact = msg.get_sender_contact() contact = msg.get_sender_contact()
assert contact == ac1.get_self_contact() assert contact == ac1.get_self_contact()
def test_basic_configure_ok_addr_setting_forbidden(self, ac1): def test_set_config_after_configure_is_forbidden(self, ac1):
assert ac1.get_config("mail_pw") assert ac1.get_config("mail_pw")
assert ac1.is_configured() assert ac1.is_configured()
with pytest.raises(ValueError): with pytest.raises(ValueError):
ac1.set_config("addr", "123@example.org") ac1.set_config("addr", "123@example.org")
with pytest.raises(ValueError):
ac1.configure(addr="123@example.org")
def test_import_export_one_contact(self, acfactory, tmpdir): def test_import_export_one_contact(self, acfactory, tmpdir):
backupdir = tmpdir.mkdir("backup") backupdir = tmpdir.mkdir("backup")