mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 09:26:29 +03:00
factor out imex tracking
This commit is contained in:
@@ -6,9 +6,7 @@ import threading
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from array import array
|
from array import array
|
||||||
from queue import Queue, Empty
|
from queue import Queue
|
||||||
|
|
||||||
from pluggy import PluginManager
|
|
||||||
|
|
||||||
import deltachat
|
import deltachat
|
||||||
from . import const
|
from . import const
|
||||||
@@ -52,7 +50,6 @@ class Account(object):
|
|||||||
|
|
||||||
self.pluggy = get_plugin_manager()
|
self.pluggy = get_plugin_manager()
|
||||||
self.pluggy.register(self._evlogger)
|
self.pluggy.register(self._evlogger)
|
||||||
self.pluggy.register(self)
|
|
||||||
deltachat.set_context_callback(self._dc_context, _ll_event)
|
deltachat.set_context_callback(self._dc_context, _ll_event)
|
||||||
|
|
||||||
# open database
|
# open database
|
||||||
@@ -61,20 +58,11 @@ class Account(object):
|
|||||||
if not lib.dc_open(self._dc_context, db_path, ffi.NULL):
|
if not lib.dc_open(self._dc_context, db_path, ffi.NULL):
|
||||||
raise ValueError("Could not dc_open: {}".format(db_path))
|
raise ValueError("Could not dc_open: {}".format(db_path))
|
||||||
self._configkeys = self.get_config("sys.config_keys").split()
|
self._configkeys = self.get_config("sys.config_keys").split()
|
||||||
self._imex_events = Queue()
|
|
||||||
atexit.register(self.shutdown)
|
atexit.register(self.shutdown)
|
||||||
|
|
||||||
# def __del__(self):
|
# def __del__(self):
|
||||||
# self.shutdown()
|
# self.shutdown()
|
||||||
|
|
||||||
@hookimpl
|
|
||||||
def process_low_level_event(self, account, event_name, data1, data2):
|
|
||||||
# there could be multiple accounts instantiated
|
|
||||||
if account != self:
|
|
||||||
method = getattr(self, "on_" + event_name.lower(), None)
|
|
||||||
if method is not None:
|
|
||||||
method(data1, data2)
|
|
||||||
|
|
||||||
def _check_config_key(self, name):
|
def _check_config_key(self, name):
|
||||||
if name not in self._configkeys:
|
if name not in self._configkeys:
|
||||||
raise KeyError("{!r} not a valid config key, existing keys: {!r}".format(
|
raise KeyError("{!r} not a valid config key, existing keys: {!r}".format(
|
||||||
@@ -393,21 +381,14 @@ class Account(object):
|
|||||||
raise RuntimeError("found more than one new file")
|
raise RuntimeError("found more than one new file")
|
||||||
return export_files[0]
|
return export_files[0]
|
||||||
|
|
||||||
def _imex_events_clear(self):
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
self._imex_events.get_nowait()
|
|
||||||
except Empty:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _export(self, path, imex_cmd):
|
def _export(self, path, imex_cmd):
|
||||||
self._imex_events_clear()
|
with ImexTracker(self) as imex_tracker:
|
||||||
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
|
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
|
||||||
if not self._threads.is_started():
|
if not self._threads.is_started():
|
||||||
lib.dc_perform_imap_jobs(self._dc_context)
|
lib.dc_perform_imap_jobs(self._dc_context)
|
||||||
files_written = []
|
files_written = []
|
||||||
while True:
|
while True:
|
||||||
ev = self._imex_events.get()
|
ev = imex_tracker.get()
|
||||||
if isinstance(ev, str):
|
if isinstance(ev, str):
|
||||||
files_written.append(ev)
|
files_written.append(ev)
|
||||||
elif isinstance(ev, bool):
|
elif isinstance(ev, bool):
|
||||||
@@ -431,11 +412,11 @@ class Account(object):
|
|||||||
self._import(path, imex_cmd=12)
|
self._import(path, imex_cmd=12)
|
||||||
|
|
||||||
def _import(self, path, imex_cmd):
|
def _import(self, path, imex_cmd):
|
||||||
self._imex_events_clear()
|
with ImexTracker(self) as imex_tracker:
|
||||||
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
|
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), ffi.NULL)
|
||||||
if not self._threads.is_started():
|
if not self._threads.is_started():
|
||||||
lib.dc_perform_imap_jobs(self._dc_context)
|
lib.dc_perform_imap_jobs(self._dc_context)
|
||||||
if not self._imex_events.get():
|
if not imex_tracker.get():
|
||||||
raise ValueError("import from path '{}' failed".format(path))
|
raise ValueError("import from path '{}' failed".format(path))
|
||||||
|
|
||||||
def initiate_key_transfer(self):
|
def initiate_key_transfer(self):
|
||||||
@@ -539,15 +520,7 @@ class Account(object):
|
|||||||
deltachat.clear_context_callback(self._dc_context)
|
deltachat.clear_context_callback(self._dc_context)
|
||||||
del self._dc_context
|
del self._dc_context
|
||||||
atexit.unregister(self.shutdown)
|
atexit.unregister(self.shutdown)
|
||||||
|
self.pluggy.unregister(self)
|
||||||
def on_dc_event_imex_progress(self, data1, data2):
|
|
||||||
if data1 == 1000:
|
|
||||||
self._imex_events.put(True)
|
|
||||||
elif data1 == 0:
|
|
||||||
self._imex_events.put(False)
|
|
||||||
|
|
||||||
def on_dc_event_imex_file_written(self, data1, data2):
|
|
||||||
self._imex_events.put(data1)
|
|
||||||
|
|
||||||
def set_location(self, latitude=0.0, longitude=0.0, accuracy=0.0):
|
def set_location(self, latitude=0.0, longitude=0.0, accuracy=0.0):
|
||||||
"""set a new location. It effects all chats where we currently
|
"""set a new location. It effects all chats where we currently
|
||||||
@@ -564,6 +537,41 @@ class Account(object):
|
|||||||
raise ValueError("no chat is streaming locations")
|
raise ValueError("no chat is streaming locations")
|
||||||
|
|
||||||
|
|
||||||
|
class ImexTracker:
|
||||||
|
def __init__(self, account):
|
||||||
|
self._imex_events = Queue()
|
||||||
|
self.account = account
|
||||||
|
|
||||||
|
@hookimpl
|
||||||
|
def process_low_level_event(self, account, event_name, data1, data2):
|
||||||
|
# there could be multiple accounts instantiated
|
||||||
|
if self.account is not account:
|
||||||
|
return
|
||||||
|
method = getattr(self, "on_" + event_name.lower(), None)
|
||||||
|
if method is not None:
|
||||||
|
print("*** on_ -> ", event_name.lower())
|
||||||
|
method(data1, data2)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.account.pluggy.register(self)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args):
|
||||||
|
self.account.pluggy.unregister(self)
|
||||||
|
|
||||||
|
def get(self, timeout=60):
|
||||||
|
return self._imex_events.get(timeout=timeout)
|
||||||
|
|
||||||
|
def on_dc_event_imex_progress(self, data1, data2):
|
||||||
|
if data1 == 1000:
|
||||||
|
self._imex_events.put(True)
|
||||||
|
elif data1 == 0:
|
||||||
|
self._imex_events.put(False)
|
||||||
|
|
||||||
|
def on_dc_event_imex_file_written(self, data1, data2):
|
||||||
|
self._imex_events.put(data1)
|
||||||
|
|
||||||
|
|
||||||
class IOThreads:
|
class IOThreads:
|
||||||
def __init__(self, dc_context, log_event=lambda *args: None):
|
def __init__(self, dc_context, log_event=lambda *args: None):
|
||||||
self._dc_context = dc_context
|
self._dc_context = dc_context
|
||||||
|
|||||||
Reference in New Issue
Block a user