snap using imapclient

This commit is contained in:
holger krekel
2020-06-08 00:02:59 +02:00
parent 2ad014faf4
commit 612a9d012c
4 changed files with 202 additions and 218 deletions

View File

@@ -1,51 +1,47 @@
import imaplib
import io
import email
import ssl
import pathlib
def db_folder_attr(name):
def fget(s):
return s.db_folder.get(name, 1)
def fset(s, val):
s.db_folder[name] = val
return property(fget, fset, None, None)
from imapclient import IMAPClient
from imapclient.exceptions import IMAPClientError
class ImapConn:
def __init__(self, account):
self.account = account
host = account.get_config("configured_mail_server")
user = account.get_config("addr")
pw = account.get_config("mail_pw")
self.connection = imaplib.IMAP4_SSL(host)
self.connection.login(user, pw)
self.connect()
def connect(self):
ssl_context = ssl.create_default_context()
# don't check if certificate hostname doesn't match target hostname
ssl_context.check_hostname = False
# don't check if the certificate is trusted by a certificate authority
ssl_context.verify_mode = ssl.CERT_NONE
host = self.account.get_config("configured_mail_server")
user = self.account.get_config("addr")
pw = self.account.get_config("mail_pw")
self.conn = IMAPClient(host, ssl_context=ssl_context)
self.conn.login(user, pw)
self._original_msg_count = {}
self.select_folder("INBOX")
def shutdown(self):
try:
self.connection.close()
except Exception:
pass
try:
self.connection.logout()
except Exception:
self.conn.logout()
except (OSError, IMAPClientError):
print("Could not logout direct_imap conn")
def select_folder(self, foldername):
status, messages = self.connection.select(foldername)
if status != "OK":
raise ConnectionError("Could not select {}: status={} message={}".format(
foldername, status, messages))
res = self.conn.select_folder(foldername)
self.foldername = foldername
try:
msg_count = int(messages[0])
except IndexError:
msg_count = 0
msg_count = res[b'UIDNEXT'] - 1
# memorize initial message count on first select
self._original_msg_count.setdefault(foldername, msg_count)
return messages
return res
def select_config_folder(self, config_name):
if "_" not in config_name:
@@ -54,66 +50,33 @@ class ImapConn:
return self.select_folder(foldername)
def list_folders(self):
res = self.connection.list()
# XXX this parsing is hairy, maybe use imapclient library
# instead of imaplib?
if res[0] != "OK":
raise ConnectionError(str(res))
folders = []
for entry in res[1]:
entry = entry.decode()
i = entry.find('"')
assert entry[i + 2] == '"'
folder_name = entry[i + 3:].strip()
folders.append(folder_name)
for meta, sep, foldername in self.conn.list_folders():
folders.append(foldername)
return folders
def get_unread_messages(self):
return self.conn.search("UNSEEN")
def mark_all_read(self):
# result, data = self.connection.uid('search', None, "(UNSEEN)")
result, data = self.connection.search(None, 'UnSeen')
try:
mails_uid = data[0].split()
print("New mails")
# self.connection.store(data[0].replace(' ',','),'+FLAGS','\Seen')
for e_id in mails_uid:
self.connection.store(e_id, '+FLAGS', '\\Seen')
print("marked:", e_id)
return True
except IndexError:
print("No unread")
return False
messages = self.get_unread_messages()
if messages:
res = self.conn.set_flags(messages, ['\\SEEN'])
print("marked seen:", messages, res)
def get_unread_cnt(self):
# result, data = self.connection.uid('search', None, "(UNSEEN)")
result, data = self.connection.search(None, 'UnSeen')
try:
mails_uid = data[0].split()
return len(mails_uid)
except IndexError:
return 0
return len(self.get_unread_messages())
def get_new_email_cnt(self):
messages = self.select_folder(self.foldername)
try:
return int(messages[0]) - self._original_msg_count[self.foldername]
except IndexError:
return int(messages[0])
def dump_imap_structures(self, dir, file):
ac = self.account
acinfo = ac.logid + "-" + ac.get_config("addr")
return self.get_unread_cnt() - self._original_msg_count[self.foldername]
def dump_account_info(self, logfile):
def log(*args, **kwargs):
kwargs["file"] = file
kwargs["file"] = logfile
print(*args, **kwargs)
log("================= ACCOUNT", acinfo, "=================")
cursor = 0
for name, val in ac.get_info().items():
for name, val in self.account.get_info().items():
entry = "{}={}".format(name.upper(), val)
if cursor + len(entry) > 80:
log("")
@@ -122,22 +85,36 @@ class ImapConn:
cursor += len(entry) + 1
log("")
def dump_imap_structures(self, dir, logfile):
stream = io.StringIO()
def log(*args, **kwargs):
kwargs["file"] = stream
print(*args, **kwargs)
acinfo = self.account.logid + "-" + self.account.get_config("addr")
empty_folders = []
for imapfolder in self.list_folders():
self.select_folder(imapfolder)
c = self.connection
typ, data = c.search(None, 'ALL')
c._get_tagged_response
log("-----------------", imapfolder, "-----------------")
for num in data[0].split():
typ, data = c.fetch(num, '(RFC822)')
body = data[0][1]
messages = self.conn.search('ALL')
if not messages:
empty_folders.append(imapfolder)
continue
typ, data = c.fetch(num, '(UID FLAGS)')
info = data[0].decode()
path = pathlib.Path(dir.strpath).joinpath("IMAP-MESSAGES", acinfo, imapfolder)
log("---------", imapfolder, len(messages), "messages ---------")
for uid, data in self.conn.fetch(messages, [b'RFC822', b'FLAGS']).items():
body_bytes = data[b'RFC822']
flags = data[b'FLAGS']
path = pathlib.Path(str(dir)).joinpath("IMAP", acinfo, imapfolder)
path.mkdir(parents=True, exist_ok=True)
num = info.split()[0]
fn = path.joinpath(num)
fn.write_bytes(body)
log("Message", info, "saved as", fn)
fn = path.joinpath(str(uid))
fn.write_bytes(body_bytes)
log("Message", uid, "saved as", fn)
email_message = email.message_from_bytes(body_bytes)
log("Message", uid, flags, "Message-Id:", email_message.get("Message-Id"))
if empty_folders:
log("--------- EMPTY FOLDERS:", empty_folders)
print(stream.getvalue(), file=logfile)

View File

@@ -381,19 +381,31 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
self._finalizers.append(bot.kill)
return bot
def dump_imap_structures(self, file):
def dump_imap_summary(self, logfile):
for ac in self._accounts:
conn = self.new_imap_conn(ac)
conn.dump_imap_structures(tmpdir, file=file)
imap = self.new_imap_conn(ac)
imap.dump_account_info(logfile=logfile)
imap.dump_imap_structures(tmpdir, logfile=logfile)
imap.shutdown()
def get_chat(self, ac1, ac2):
chat12, chat21 = self.get_chats(ac1, ac2)
return chat12
def get_chats(self, ac1, ac2):
chat12 = ac1.create_chat_by_contact(
ac1.create_contact(email=ac2.get_config("addr")))
chat21 = ac2.create_chat_by_contact(
ac2.create_contact(email=ac1.get_config("addr")))
return chat12, chat21
am = AccountMaker()
request.addfinalizer(am.finalize)
yield am
if hasattr(request.node, "rep_call") and request.node.rep_call.failed:
file = io.StringIO()
am.dump_imap_structures(file=file)
s = file.getvalue()
print(s)
logfile = io.StringIO()
am.dump_imap_summary(logfile=logfile)
print(logfile.getvalue())
# request.node.add_report_section("call", "imap-server-state", s)