diff --git a/python/src/deltachat/direct_imap.py b/python/src/deltachat/direct_imap.py index 75f7971bf..3040d3861 100644 --- a/python/src/deltachat/direct_imap.py +++ b/python/src/deltachat/direct_imap.py @@ -4,14 +4,12 @@ import click import ssl import atexit import email -from imapclient import IMAPClient -from imapclient.exceptions import IMAPClientError import contextlib import time -#from persistentdict import PersistentDict +import imaplib +from subprocess import call - -INBOX = "INBOX" +INBOX = "Inbox" SENT = "Sent" MVBOX = "DeltaChat" DC_CONSTANT_MSG_MOVESTATE_PENDING = 1 @@ -27,358 +25,63 @@ def db_folder_attr(name): s.db_folder[name] = val return property(fget, fset, None, None) - -lock_log = threading.RLock() -started = time.time() - - -class ImapConn(object): - def __init__(self, db, foldername, conn_info): - self.db = db +class ImapConn(): + def __init__(self, foldername, conn_info): self.foldername = foldername - self._thread = None - self.MHOST, self.MUSER, self.MPASSWORD = conn_info - self.event_initial_polling_complete = threading.Event() - self.pending_imap_jobs = False + host, user, pw = conn_info - if not db is None: - # persistent database state below - self.db_folder = self.db.setdefault(foldername, {}) - self.db_messages = self.db.setdefault(":message-full", {}) - else: - self.db_messages = {} - - last_sync_uid = db_folder_attr("last_sync_uid") - - @contextlib.contextmanager - def wlog(self, msg): - t = time.time() - started - with lock_log: - print("%03.2f [%s] %s -->" % (t, self.foldername, msg)) - t0 = time.time() - yield - t1 = time.time() - with lock_log: - print("%03.2f [%s] ... finish %s (%3.2f secs)" % (t1-started, self.foldername, msg, t1-t0)) - - def log(self, *msgs): - t = time.time() - started - bmsg = "%03.2f [%s]" % (t, self.foldername) - with lock_log: - print(bmsg, *msgs) - - def connect(self): - with self.wlog("IMAP_CONNECT {}: {}".format(self.MUSER, self.MPASSWORD)): - ssl_context = ssl.create_default_context() - - # don't check if certificate hostname doesn't match target hostname - ssl_context.check_hostname = False - - # don't check if the certificate is trusted by a certificate authority - ssl_context.verify_mode = ssl.CERT_NONE - self.conn = IMAPClient(self.MHOST, ssl_context=ssl_context) - self.conn.login(self.MUSER, self.MPASSWORD) - self.log(self.conn.welcome) - try: - self.select_info = self.conn.select_folder(self.foldername) - except IMAPClientError: - self.ensure_folder_exists() - self.select_info = self.conn.select_folder(self.foldername) - - self.log('folder has %d messages' % self.select_info[b'EXISTS']) - self.log('capabilities', self.conn.capabilities()) - - def ensure_folder_exists(self): - with self.wlog("ensure_folder_exists: {}".format(self.foldername)): - try: - resp = self.conn.create_folder(self.foldername) - except IMAPClientError as e: - if "ALREADYEXISTS" in str(e): - return - print("EXCEPTION:" + str(e)) - else: - print("Server sent:", resp if resp else "nothing") - - def move(self, messages): - self.log("IMAP_MOVE to {}: {}".format(MVBOX, messages)) + self.connection = imaplib.IMAP4_SSL(host) + self.connection.login(user, pw) + messages = self._reselect_folder() try: - resp = self.conn.move(messages, MVBOX) - except IMAPClientError as e: - if "EXPUNGEISSUED" in str(e): - self.log("IMAP_MOVE errored with EXPUNGEISSUED, probably another client moved it") - else: - self.log("IMAP_MOVE {} successfully completed.".format(messages)) + self.original_msg_count = messages[0] + except IndexError: + self.original_msg_count = 0 - def perform_imap_idle(self): - if self.pending_imap_jobs: - self.log("perform_imap_idle skipped because jobs are pending") - return - with self.wlog("IMAP_IDLE()"): - res = self.conn.idle() - interrupted = False - while not interrupted: - # Wait for up to 30 seconds for an IDLE response - responses = self.conn.idle_check(timeout=30) - self.log("Server sent:", responses if responses else "nothing") - for resp in responses: - if resp[1] == b"EXISTS": - # we ignore what is returned and just let - # perform_imap_fetch look since lastseen - # id = resp[0] - interrupted = True - resp = self.conn.idle_done() + def mark_all_read(self): + self._reselect_folder() +# result, data = self.connection.uid('search', None, "(UNSEEN)") + result, data = self.connection.search(None, 'UnSeen') + try: + mails_uid = data[0].split() + newest_mail = mails_uid[0] + print("New mails") - def perform_imap_fetch(self): - range = "%s:*" % (self.last_sync_uid + 1,) - with self.wlog("IMAP_PERFORM_FETCH %s" % (range,)): - requested_fields = [ - b"RFC822.SIZE", b'FLAGS', - b"BODY.PEEK[HEADER.FIELDS (FROM TO CC DATE CHAT-VERSION MESSAGE-ID IN-REPLY-TO)]" - ] - resp = self.conn.fetch(range, requested_fields) - timestamp_fetch = time.time() - for uid in sorted(resp): # get lower uids first - if uid < self.last_sync_uid: - self.log("IMAP-ODDITY: ignoring bogus uid %s, it is lower than min-requested %s" % ( - uid, self.last_sync_uid)) - continue - data = resp[uid] - headers = data[requested_fields[-1].replace(b'.PEEK', b'')] - msg_headers = email.message_from_bytes(headers) - message_id = normalized_messageid(msg_headers) - chat_version = msg_headers.get("Chat-Version") - in_reply_to = msg_headers.get("In-Reply-To", "").lower() +# self.connection.store(data[0].replace(' ',','),'+FLAGS','\Seen') + for e_id in mails_uid: + self.connection.store(e_id, '+FLAGS', '\Seen') + print("marked:",e_id) - if not self.has_message(normalized_messageid(msg_headers)): - self.log('fetching body of ID %d: %d bytes, message-id=%s ' - 'in-reply-to=%s chat-version=%s' % ( - uid, data[b'RFC822.SIZE'], message_id, in_reply_to, chat_version,)) - fetchbody_resp = self.conn.fetch(uid, [b'BODY.PEEK[]']) - msg = email.message_from_bytes(fetchbody_resp[uid][b'BODY[]']) - msg.fetch_retrieve_time = timestamp_fetch - msg.foldername = self.foldername - msg.uid = uid - msg.move_state = DC_CONSTANT_MSG_MOVESTATE_PENDING - self.store_message(message_id, msg) - else: - msg = self.get_message_from_db(message_id) - self.log('fetching-from-db: ID %s message-id=%s' % (uid, message_id)) - if msg.foldername != self.foldername: - self.log("detected moved message", message_id) - msg.foldername = self.foldername - msg.move_state = DC_CONSTANT_MSG_MOVESTATE_STAY + return True + except IndexError: + print("No unread") + return False - if self.foldername in (INBOX, SENT): - if self.resolve_move_status(msg) != DC_CONSTANT_MSG_MOVESTATE_PENDING: - # see if there are pending messages which have a in-reply-to - # to our currnet msg - # NOTE: should be one sql-statement to find the - # possibly multiple messages that waited on us - for dbmid, dbmsg in self.db_messages.items(): - if dbmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING: - if dbmsg.get("In-Reply-To", "").lower() == message_id: - self.log("resolving pending message", dbmid) - # resolving the dependent message must work now - res = self.resolve_move_status(dbmsg) - assert res != DC_CONSTANT_MSG_MOVESTATE_PENDING, (dbmid, res) + def get_unread_cnt(self): + self._reselect_folder() +# result, data = self.connection.uid('search', None, "(UNSEEN)") + result, data = self.connection.search(None, 'UnSeen') + try: + mails_uid = data[0].split() - if not self.has_message(message_id): - self.store_message(message_id, msg) + return len(mails_uid) + except IndexError: + return 0 - self.last_sync_uid = max(uid, self.last_sync_uid) + def get_new_email_cnt(self): + messages = self._reselect_folder() + try: + return messages[0] - self.original_msg_count + except IndexError: + return 0 - self.log("last-sync-uid after fetch:", self.last_sync_uid) - if not self.db is None: - self.db.sync() + def _reselect_folder(self): + status, messages = self.connection.select(self.foldername) + if status != "OK": + print("Incorrect mail box " + status + str(messages)) + raise ConnectionError + print("(Re-)Selected mailbox: " + status + " " + str(messages)) + return messages - def resolve_move_status(self, msg): - """ Return move-state after this message's next move-state is determined (i.e. it is not PENDING)""" - message_id = normalized_messageid(msg) - if msg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING: - res = self.determine_next_move_state(msg) - if res == DC_CONSTANT_MSG_MOVESTATE_MOVING: - self.schedule_move(msg) - msg.move_state = DC_CONSTANT_MSG_MOVESTATE_MOVING - elif res == DC_CONSTANT_MSG_MOVESTATE_STAY: - self.log("STAY uid=%s message-id=%s" % (msg.uid, message_id)) - msg.move_state = DC_CONSTANT_MSG_MOVESTATE_STAY - else: - self.log("PENDING uid=%s message-id=%s in-reply-to=%s" % ( - msg.uid, message_id, msg["In-Reply-To"])) - return msg.move_state - - def determine_next_move_state(self, msg): - """ Return the next move state for this message. - Only call this function if the message is pending. - This function works with the DB, does not perform any IMAP commands. - """ - self.log("shall_move %s " % (normalized_messageid(msg))) - assert self.foldername in (INBOX, SENT) - assert msg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING - if msg.foldername == MVBOX: - self.log("is already in mvbox, next state is STAY %s" % (normalized_messageid(msg))) - return DC_CONSTANT_MSG_MOVESTATE_STAY - last_dc_count = 0 - while 1: - last_dc_count = (last_dc_count + 1) if is_dc_message(msg) else 0 - in_reply_to = normalized_messageid(msg.get("In-Reply-To", "")) - if not in_reply_to: - type_msg = "DC" if last_dc_count else "CLEAR" - self.log("detected thread-start %s message" % type_msg, normalized_messageid(msg)) - if last_dc_count > 0: - return DC_CONSTANT_MSG_MOVESTATE_MOVING - else: - return DC_CONSTANT_MSG_MOVESTATE_STAY - - newmsg = self.get_message_from_db(in_reply_to) - if not newmsg: - self.log("failed to fetch from db:", in_reply_to) - # we don't have the parent message ... maybe because - # it hasn't arrived (yet), was deleted or we failed to - # scan/fetch it: - if last_dc_count >= 4: - self.log("no thread-start found, but last 4 messages were DC") - return DC_CONSTANT_MSG_MOVESTATE_MOVING - else: - self.log("pending: missing parent, last_dc_count=%x" % (last_dc_count, )) - return DC_CONSTANT_MSG_MOVESTATE_PENDING - elif newmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_MOVING: - self.log("parent was a moved message") - return DC_CONSTANT_MSG_MOVESTATE_MOVING - else: - msg = newmsg - assert 0, "should never arrive here" - - def schedule_move(self, msg): - message_id = normalized_messageid(msg) - assert msg.foldername != MVBOX - self.log("scheduling move message-id=%s" % (message_id)) - self.pending_imap_jobs = True - - def has_message(self, message_id): - assert isinstance(message_id, str) - return message_id in self.db_messages - - def get_message_from_db(self, message_id): - return self.db_messages.get(normalized_messageid(message_id)) - - def store_message(self, message_id, msg): - mid2 = normalized_messageid(msg) - message_id = normalized_messageid(message_id) - assert message_id == mid2 - assert message_id not in self.db_messages, message_id - assert msg.foldername in (MVBOX, SENT, INBOX) - self.db_messages[message_id] = msg - self.log("stored new message message-id=%s" % (message_id,)) - - def forget_about_too_old_pending_messages(self): - # some housekeeping but not sure if neccessary - # because the involved sql-statements - # probably don't care if there are some foreever-pending messages - now = time.time() - for dbmid, dbmsg in self.db_messages.items(): - if dbmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING: - delay = now - dbmsg.fetch_retrieve_time - if delay > self.pendingtimeout: - dbmsg.move_state = DC_CONSTANT_MSG_MOVESTATE_STAY - self.log("pendingtimeout: message now set to stay", dbmid) - - def perform_imap_jobs(self): - with self.wlog("perform_imap_jobs()"): - if self.foldername in (INBOX, SENT): - to_move_uids = [] - to_move_msgs = [] - - # determine all uids of messages that are to be moved - for dbmid, dbmsg in self.db_messages.items(): - if dbmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_MOVING: - if dbmsg.uid > 0: # else it's already moved? - to_move_uids.append(dbmsg.uid) - to_move_msgs.append(dbmsg) - if to_move_uids: - self.move(to_move_uids) - # now that we moved let's invalidate "uid" because it's - # not there anyore in thie folder - for dbmsg in to_move_msgs: - dbmsg.uid = 0 - self.pending_imap_jobs = False - - def _run_in_thread(self): - self.connect() - if self.foldername == INBOX: - # INBOX loop should wait until MVBOX polled once - mvbox.event_initial_polling_complete.wait() - now = time.time() - while True: - self.perform_imap_jobs() - self.perform_imap_fetch() - if self.foldername == MVBOX: - # signal that MVBOX has polled once - self.event_initial_polling_complete.set() - elif self.foldername == INBOX: - # it's not clear we need to do this housekeeping - # (depends on the SQL statements) - self.forget_about_too_old_pending_messages() - self.perform_imap_idle() - - def start_thread_loop(self): - assert not self._thread - self._thread = t = threading.Thread(target=self._run_in_thread) - t.start() - - -def repr_msg(msg): - res = ["message-id: " + str(msg["message-id"]), - "foldername: " + msg.foldername, - "uid: " + str(msg.uid), - ] - return "\n".join(res) - - -def is_dc_message(msg): - return msg and msg.get("Chat-Version") - - -def normalized_messageid(msg): - if isinstance(msg, str): - return msg.lower() - return msg["Message-ID"].lower() - - -@click.command(context_settings=dict(help_option_names=["-h", "--help"])) -@click.option("--pendingtimeout", type=int, default=3600, - help="(default 3600) seconds which a message is still considered for moving " - "even though it has no determined thread-start message") -@click.option("--basedir", type=click.Path(), - default=click.get_app_dir("imap_move_chats"), - help="directory where database files are stored") -@click.option("-n", "--name", type=str, default=None, - help="database name (by default derived from login-user)") -@click.argument("imaphost", type=str, required=True) -@click.argument("login-user", type=str, required=True) -@click.argument("login-password", type=str, required=True) -@click.pass_context -def main(context, basedir, name, imaphost, login_user, login_password, pendingtimeout): - global mvbox - if not os.path.exists(basedir): - os.makedirs(basedir) - if name is None: - name = login_user -# dbpath = os.path.join(basedir, name) + ".db" -# print("Using dbfile:", dbpath) -# db = PersistentDict(dbpath) - db = None - conn_info = (imaphost, login_user, login_password) - inbox = ImapConn(db, INBOX, conn_info=conn_info) - inbox.connect() - assert 0 - sent = ImapConn(db, SENT, conn_info=conn_info) - inbox.pendingtimeout = pendingtimeout - mvbox = ImapConn(db, MVBOX, conn_info=conn_info) - mvbox.start_thread_loop() - inbox.start_thread_loop() - sent.start_thread_loop() - - -if __name__ == "__main__": - main() + def __del__(self): + self.connection.shutdown() diff --git a/python/src/deltachat/testplugin.py b/python/src/deltachat/testplugin.py index c0fb5ce55..41fb9bcfa 100644 --- a/python/src/deltachat/testplugin.py +++ b/python/src/deltachat/testplugin.py @@ -380,8 +380,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data): def make_direct_imap(self, account, folder): conn_info = (account.get_config("configured_mail_server"), account.get_config("addr"), account.get_config("mail_pw")) - imap = ImapConn(None, folder, conn_info=conn_info) - imap.connect() + imap = ImapConn(folder, conn_info=conn_info) return imap am = AccountMaker()