import io import email import ssl import pathlib from imapclient import IMAPClient from imapclient.exceptions import IMAPClientError SEEN = b'\\Seen' FLAGS = b'FLAGS' FETCH = b'FETCH' class ImapConn: def __init__(self, account): self.account = account self._idling = False self.connect() def connect(self): ssl_context = ssl.create_default_context() # don't check if certificate hostname doesn't match target hostname ssl_context.check_hostname = False # don't check if the certificate is trusted by a certificate authority ssl_context.verify_mode = ssl.CERT_NONE host = self.account.get_config("configured_mail_server") user = self.account.get_config("addr") pw = self.account.get_config("mail_pw") self.conn = IMAPClient(host, ssl_context=ssl_context) self.conn.login(user, pw) self.select_folder("INBOX") def shutdown(self): try: self.conn.idle_done() except (OSError, IMAPClientError): pass try: self.conn.logout() except (OSError, IMAPClientError): print("Could not logout direct_imap conn") def select_folder(self, foldername): assert not self._idling return self.conn.select_folder(foldername) def select_config_folder(self, config_name): if "_" not in config_name: config_name = "configured_{}_folder".format(config_name) foldername = self.account.get_config(config_name) return self.select_folder(foldername) def list_folders(self): assert not self._idling folders = [] for meta, sep, foldername in self.conn.list_folders(): folders.append(foldername) return folders def get_all_messages(self): assert not self._idling return self.conn.fetch("1:*", [FLAGS]) def get_unread_messages(self): assert not self._idling res = self.conn.fetch("1:*", [FLAGS]) return [uid for uid in res if SEEN not in res[uid][FLAGS]] def mark_all_read(self): messages = self.get_unread_messages() if messages: res = self.conn.set_flags(messages, [SEEN]) print("marked seen:", messages, res) def get_unread_cnt(self): return len(self.get_unread_messages()) def dump_account_info(self, logfile): def log(*args, **kwargs): kwargs["file"] = logfile print(*args, **kwargs) cursor = 0 for name, val in self.account.get_info().items(): entry = "{}={}".format(name.upper(), val) if cursor + len(entry) > 80: log("") cursor = 0 log(entry, end=" ") cursor += len(entry) + 1 log("") def dump_imap_structures(self, dir, logfile): assert not self._idling stream = io.StringIO() def log(*args, **kwargs): kwargs["file"] = stream print(*args, **kwargs) acinfo = self.account.logid + "-" + self.account.get_config("addr") empty_folders = [] for imapfolder in self.list_folders(): self.select_folder(imapfolder) messages = list(self.get_all_messages()) if not messages: empty_folders.append(imapfolder) continue log("---------", imapfolder, len(messages), "messages ---------") # request message content without auto-marking it as seen requested = [b'BODY.PEEK[HEADER]', FLAGS] for uid, data in self.conn.fetch(messages, requested).items(): body_bytes = data[b'BODY[HEADER]'] flags = data[FLAGS] path = pathlib.Path(str(dir)).joinpath("IMAP", acinfo, imapfolder) path.mkdir(parents=True, exist_ok=True) fn = path.joinpath(str(uid)) fn.write_bytes(body_bytes) log("Message", uid, fn) email_message = email.message_from_bytes(body_bytes) log("Message", uid, flags, "Message-Id:", email_message.get("Message-Id")) if empty_folders: log("--------- EMPTY FOLDERS:", empty_folders) print(stream.getvalue(), file=logfile) def idle(self): assert not self._idling res = self.conn.idle() self._idling = True return res def idle_check(self, terminate=False): assert self._idling self.account.log("imap-direct: calling idle_check") res = self.conn.idle_check() if terminate: self.idle_done() return res def idle_done(self): if self._idling: res = self.conn.idle_done() self._idling = False return res