Use Imaplib instead of ImapClient

This commit is contained in:
Hocuri
2020-05-23 14:11:43 +02:00
parent 1290c191b3
commit c0003c6726
2 changed files with 53 additions and 351 deletions

View File

@@ -4,14 +4,12 @@ import click
import ssl import ssl
import atexit import atexit
import email import email
from imapclient import IMAPClient
from imapclient.exceptions import IMAPClientError
import contextlib import contextlib
import time import time
#from persistentdict import PersistentDict import imaplib
from subprocess import call
INBOX = "Inbox"
INBOX = "INBOX"
SENT = "Sent" SENT = "Sent"
MVBOX = "DeltaChat" MVBOX = "DeltaChat"
DC_CONSTANT_MSG_MOVESTATE_PENDING = 1 DC_CONSTANT_MSG_MOVESTATE_PENDING = 1
@@ -27,358 +25,63 @@ def db_folder_attr(name):
s.db_folder[name] = val s.db_folder[name] = val
return property(fget, fset, None, None) return property(fget, fset, None, None)
class ImapConn():
lock_log = threading.RLock() def __init__(self, foldername, conn_info):
started = time.time()
class ImapConn(object):
def __init__(self, db, foldername, conn_info):
self.db = db
self.foldername = foldername self.foldername = foldername
self._thread = None host, user, pw = conn_info
self.MHOST, self.MUSER, self.MPASSWORD = conn_info
self.event_initial_polling_complete = threading.Event()
self.pending_imap_jobs = False
if not db is None: self.connection = imaplib.IMAP4_SSL(host)
# persistent database state below self.connection.login(user, pw)
self.db_folder = self.db.setdefault(foldername, {}) messages = self._reselect_folder()
self.db_messages = self.db.setdefault(":message-full", {})
else:
self.db_messages = {}
last_sync_uid = db_folder_attr("last_sync_uid")
@contextlib.contextmanager
def wlog(self, msg):
t = time.time() - started
with lock_log:
print("%03.2f [%s] %s -->" % (t, self.foldername, msg))
t0 = time.time()
yield
t1 = time.time()
with lock_log:
print("%03.2f [%s] ... finish %s (%3.2f secs)" % (t1-started, self.foldername, msg, t1-t0))
def log(self, *msgs):
t = time.time() - started
bmsg = "%03.2f [%s]" % (t, self.foldername)
with lock_log:
print(bmsg, *msgs)
def connect(self):
with self.wlog("IMAP_CONNECT {}: {}".format(self.MUSER, self.MPASSWORD)):
ssl_context = ssl.create_default_context()
# don't check if certificate hostname doesn't match target hostname
ssl_context.check_hostname = False
# don't check if the certificate is trusted by a certificate authority
ssl_context.verify_mode = ssl.CERT_NONE
self.conn = IMAPClient(self.MHOST, ssl_context=ssl_context)
self.conn.login(self.MUSER, self.MPASSWORD)
self.log(self.conn.welcome)
try:
self.select_info = self.conn.select_folder(self.foldername)
except IMAPClientError:
self.ensure_folder_exists()
self.select_info = self.conn.select_folder(self.foldername)
self.log('folder has %d messages' % self.select_info[b'EXISTS'])
self.log('capabilities', self.conn.capabilities())
def ensure_folder_exists(self):
with self.wlog("ensure_folder_exists: {}".format(self.foldername)):
try:
resp = self.conn.create_folder(self.foldername)
except IMAPClientError as e:
if "ALREADYEXISTS" in str(e):
return
print("EXCEPTION:" + str(e))
else:
print("Server sent:", resp if resp else "nothing")
def move(self, messages):
self.log("IMAP_MOVE to {}: {}".format(MVBOX, messages))
try: try:
resp = self.conn.move(messages, MVBOX) self.original_msg_count = messages[0]
except IMAPClientError as e: except IndexError:
if "EXPUNGEISSUED" in str(e): self.original_msg_count = 0
self.log("IMAP_MOVE errored with EXPUNGEISSUED, probably another client moved it")
else:
self.log("IMAP_MOVE {} successfully completed.".format(messages))
def perform_imap_idle(self): def mark_all_read(self):
if self.pending_imap_jobs: self._reselect_folder()
self.log("perform_imap_idle skipped because jobs are pending") # result, data = self.connection.uid('search', None, "(UNSEEN)")
return result, data = self.connection.search(None, 'UnSeen')
with self.wlog("IMAP_IDLE()"): try:
res = self.conn.idle() mails_uid = data[0].split()
interrupted = False newest_mail = mails_uid[0]
while not interrupted: print("New mails")
# Wait for up to 30 seconds for an IDLE response
responses = self.conn.idle_check(timeout=30)
self.log("Server sent:", responses if responses else "nothing")
for resp in responses:
if resp[1] == b"EXISTS":
# we ignore what is returned and just let
# perform_imap_fetch look since lastseen
# id = resp[0]
interrupted = True
resp = self.conn.idle_done()
def perform_imap_fetch(self): # self.connection.store(data[0].replace(' ',','),'+FLAGS','\Seen')
range = "%s:*" % (self.last_sync_uid + 1,) for e_id in mails_uid:
with self.wlog("IMAP_PERFORM_FETCH %s" % (range,)): self.connection.store(e_id, '+FLAGS', '\Seen')
requested_fields = [ print("marked:",e_id)
b"RFC822.SIZE", b'FLAGS',
b"BODY.PEEK[HEADER.FIELDS (FROM TO CC DATE CHAT-VERSION MESSAGE-ID IN-REPLY-TO)]"
]
resp = self.conn.fetch(range, requested_fields)
timestamp_fetch = time.time()
for uid in sorted(resp): # get lower uids first
if uid < self.last_sync_uid:
self.log("IMAP-ODDITY: ignoring bogus uid %s, it is lower than min-requested %s" % (
uid, self.last_sync_uid))
continue
data = resp[uid]
headers = data[requested_fields[-1].replace(b'.PEEK', b'')]
msg_headers = email.message_from_bytes(headers)
message_id = normalized_messageid(msg_headers)
chat_version = msg_headers.get("Chat-Version")
in_reply_to = msg_headers.get("In-Reply-To", "").lower()
if not self.has_message(normalized_messageid(msg_headers)): return True
self.log('fetching body of ID %d: %d bytes, message-id=%s ' except IndexError:
'in-reply-to=%s chat-version=%s' % ( print("No unread")
uid, data[b'RFC822.SIZE'], message_id, in_reply_to, chat_version,)) return False
fetchbody_resp = self.conn.fetch(uid, [b'BODY.PEEK[]'])
msg = email.message_from_bytes(fetchbody_resp[uid][b'BODY[]'])
msg.fetch_retrieve_time = timestamp_fetch
msg.foldername = self.foldername
msg.uid = uid
msg.move_state = DC_CONSTANT_MSG_MOVESTATE_PENDING
self.store_message(message_id, msg)
else:
msg = self.get_message_from_db(message_id)
self.log('fetching-from-db: ID %s message-id=%s' % (uid, message_id))
if msg.foldername != self.foldername:
self.log("detected moved message", message_id)
msg.foldername = self.foldername
msg.move_state = DC_CONSTANT_MSG_MOVESTATE_STAY
if self.foldername in (INBOX, SENT): def get_unread_cnt(self):
if self.resolve_move_status(msg) != DC_CONSTANT_MSG_MOVESTATE_PENDING: self._reselect_folder()
# see if there are pending messages which have a in-reply-to # result, data = self.connection.uid('search', None, "(UNSEEN)")
# to our currnet msg result, data = self.connection.search(None, 'UnSeen')
# NOTE: should be one sql-statement to find the try:
# possibly multiple messages that waited on us mails_uid = data[0].split()
for dbmid, dbmsg in self.db_messages.items():
if dbmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING:
if dbmsg.get("In-Reply-To", "").lower() == message_id:
self.log("resolving pending message", dbmid)
# resolving the dependent message must work now
res = self.resolve_move_status(dbmsg)
assert res != DC_CONSTANT_MSG_MOVESTATE_PENDING, (dbmid, res)
if not self.has_message(message_id): return len(mails_uid)
self.store_message(message_id, msg) except IndexError:
return 0
self.last_sync_uid = max(uid, self.last_sync_uid) def get_new_email_cnt(self):
messages = self._reselect_folder()
try:
return messages[0] - self.original_msg_count
except IndexError:
return 0
self.log("last-sync-uid after fetch:", self.last_sync_uid) def _reselect_folder(self):
if not self.db is None: status, messages = self.connection.select(self.foldername)
self.db.sync() if status != "OK":
print("Incorrect mail box " + status + str(messages))
raise ConnectionError
print("(Re-)Selected mailbox: " + status + " " + str(messages))
return messages
def resolve_move_status(self, msg): def __del__(self):
""" Return move-state after this message's next move-state is determined (i.e. it is not PENDING)""" self.connection.shutdown()
message_id = normalized_messageid(msg)
if msg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING:
res = self.determine_next_move_state(msg)
if res == DC_CONSTANT_MSG_MOVESTATE_MOVING:
self.schedule_move(msg)
msg.move_state = DC_CONSTANT_MSG_MOVESTATE_MOVING
elif res == DC_CONSTANT_MSG_MOVESTATE_STAY:
self.log("STAY uid=%s message-id=%s" % (msg.uid, message_id))
msg.move_state = DC_CONSTANT_MSG_MOVESTATE_STAY
else:
self.log("PENDING uid=%s message-id=%s in-reply-to=%s" % (
msg.uid, message_id, msg["In-Reply-To"]))
return msg.move_state
def determine_next_move_state(self, msg):
""" Return the next move state for this message.
Only call this function if the message is pending.
This function works with the DB, does not perform any IMAP commands.
"""
self.log("shall_move %s " % (normalized_messageid(msg)))
assert self.foldername in (INBOX, SENT)
assert msg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING
if msg.foldername == MVBOX:
self.log("is already in mvbox, next state is STAY %s" % (normalized_messageid(msg)))
return DC_CONSTANT_MSG_MOVESTATE_STAY
last_dc_count = 0
while 1:
last_dc_count = (last_dc_count + 1) if is_dc_message(msg) else 0
in_reply_to = normalized_messageid(msg.get("In-Reply-To", ""))
if not in_reply_to:
type_msg = "DC" if last_dc_count else "CLEAR"
self.log("detected thread-start %s message" % type_msg, normalized_messageid(msg))
if last_dc_count > 0:
return DC_CONSTANT_MSG_MOVESTATE_MOVING
else:
return DC_CONSTANT_MSG_MOVESTATE_STAY
newmsg = self.get_message_from_db(in_reply_to)
if not newmsg:
self.log("failed to fetch from db:", in_reply_to)
# we don't have the parent message ... maybe because
# it hasn't arrived (yet), was deleted or we failed to
# scan/fetch it:
if last_dc_count >= 4:
self.log("no thread-start found, but last 4 messages were DC")
return DC_CONSTANT_MSG_MOVESTATE_MOVING
else:
self.log("pending: missing parent, last_dc_count=%x" % (last_dc_count, ))
return DC_CONSTANT_MSG_MOVESTATE_PENDING
elif newmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_MOVING:
self.log("parent was a moved message")
return DC_CONSTANT_MSG_MOVESTATE_MOVING
else:
msg = newmsg
assert 0, "should never arrive here"
def schedule_move(self, msg):
message_id = normalized_messageid(msg)
assert msg.foldername != MVBOX
self.log("scheduling move message-id=%s" % (message_id))
self.pending_imap_jobs = True
def has_message(self, message_id):
assert isinstance(message_id, str)
return message_id in self.db_messages
def get_message_from_db(self, message_id):
return self.db_messages.get(normalized_messageid(message_id))
def store_message(self, message_id, msg):
mid2 = normalized_messageid(msg)
message_id = normalized_messageid(message_id)
assert message_id == mid2
assert message_id not in self.db_messages, message_id
assert msg.foldername in (MVBOX, SENT, INBOX)
self.db_messages[message_id] = msg
self.log("stored new message message-id=%s" % (message_id,))
def forget_about_too_old_pending_messages(self):
# some housekeeping but not sure if neccessary
# because the involved sql-statements
# probably don't care if there are some foreever-pending messages
now = time.time()
for dbmid, dbmsg in self.db_messages.items():
if dbmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_PENDING:
delay = now - dbmsg.fetch_retrieve_time
if delay > self.pendingtimeout:
dbmsg.move_state = DC_CONSTANT_MSG_MOVESTATE_STAY
self.log("pendingtimeout: message now set to stay", dbmid)
def perform_imap_jobs(self):
with self.wlog("perform_imap_jobs()"):
if self.foldername in (INBOX, SENT):
to_move_uids = []
to_move_msgs = []
# determine all uids of messages that are to be moved
for dbmid, dbmsg in self.db_messages.items():
if dbmsg.move_state == DC_CONSTANT_MSG_MOVESTATE_MOVING:
if dbmsg.uid > 0: # else it's already moved?
to_move_uids.append(dbmsg.uid)
to_move_msgs.append(dbmsg)
if to_move_uids:
self.move(to_move_uids)
# now that we moved let's invalidate "uid" because it's
# not there anyore in thie folder
for dbmsg in to_move_msgs:
dbmsg.uid = 0
self.pending_imap_jobs = False
def _run_in_thread(self):
self.connect()
if self.foldername == INBOX:
# INBOX loop should wait until MVBOX polled once
mvbox.event_initial_polling_complete.wait()
now = time.time()
while True:
self.perform_imap_jobs()
self.perform_imap_fetch()
if self.foldername == MVBOX:
# signal that MVBOX has polled once
self.event_initial_polling_complete.set()
elif self.foldername == INBOX:
# it's not clear we need to do this housekeeping
# (depends on the SQL statements)
self.forget_about_too_old_pending_messages()
self.perform_imap_idle()
def start_thread_loop(self):
assert not self._thread
self._thread = t = threading.Thread(target=self._run_in_thread)
t.start()
def repr_msg(msg):
res = ["message-id: " + str(msg["message-id"]),
"foldername: " + msg.foldername,
"uid: " + str(msg.uid),
]
return "\n".join(res)
def is_dc_message(msg):
return msg and msg.get("Chat-Version")
def normalized_messageid(msg):
if isinstance(msg, str):
return msg.lower()
return msg["Message-ID"].lower()
@click.command(context_settings=dict(help_option_names=["-h", "--help"]))
@click.option("--pendingtimeout", type=int, default=3600,
help="(default 3600) seconds which a message is still considered for moving "
"even though it has no determined thread-start message")
@click.option("--basedir", type=click.Path(),
default=click.get_app_dir("imap_move_chats"),
help="directory where database files are stored")
@click.option("-n", "--name", type=str, default=None,
help="database name (by default derived from login-user)")
@click.argument("imaphost", type=str, required=True)
@click.argument("login-user", type=str, required=True)
@click.argument("login-password", type=str, required=True)
@click.pass_context
def main(context, basedir, name, imaphost, login_user, login_password, pendingtimeout):
global mvbox
if not os.path.exists(basedir):
os.makedirs(basedir)
if name is None:
name = login_user
# dbpath = os.path.join(basedir, name) + ".db"
# print("Using dbfile:", dbpath)
# db = PersistentDict(dbpath)
db = None
conn_info = (imaphost, login_user, login_password)
inbox = ImapConn(db, INBOX, conn_info=conn_info)
inbox.connect()
assert 0
sent = ImapConn(db, SENT, conn_info=conn_info)
inbox.pendingtimeout = pendingtimeout
mvbox = ImapConn(db, MVBOX, conn_info=conn_info)
mvbox.start_thread_loop()
inbox.start_thread_loop()
sent.start_thread_loop()
if __name__ == "__main__":
main()

View File

@@ -380,8 +380,7 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
def make_direct_imap(self, account, folder): def make_direct_imap(self, account, folder):
conn_info = (account.get_config("configured_mail_server"), conn_info = (account.get_config("configured_mail_server"),
account.get_config("addr"), account.get_config("mail_pw")) account.get_config("addr"), account.get_config("mail_pw"))
imap = ImapConn(None, folder, conn_info=conn_info) imap = ImapConn(folder, conn_info=conn_info)
imap.connect()
return imap return imap
am = AccountMaker() am = AccountMaker()