Add print_imap_structure

This commit is contained in:
Hocuri
2020-05-28 17:15:27 +02:00
parent a8d53301e3
commit 79dac9513f
4 changed files with 49 additions and 17 deletions

View File

@@ -630,7 +630,7 @@ class Account(object):
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_after_shutdown(account=self)
self.log("shutdown finished")
class ScannedQRCode:
def __init__(self, dc_lot):

View File

@@ -1,4 +1,6 @@
import imaplib
import pathlib
from . import Account
INBOX = "Inbox"
SENT = "Sent"
@@ -24,14 +26,14 @@ class ImapConn():
self.connection = imaplib.IMAP4_SSL(host)
self.connection.login(user, pw)
messages = self._reselect_folder()
messages = self.reselect_folder()
try:
self.original_msg_count = messages[0]
except IndexError:
self.original_msg_count = 0
def mark_all_read(self):
self._reselect_folder()
self.reselect_folder()
# result, data = self.connection.uid('search', None, "(UNSEEN)")
result, data = self.connection.search(None, 'UnSeen')
try:
@@ -49,7 +51,7 @@ class ImapConn():
return False
def get_unread_cnt(self):
self._reselect_folder()
self.reselect_folder()
# result, data = self.connection.uid('search', None, "(UNSEEN)")
result, data = self.connection.search(None, 'UnSeen')
try:
@@ -60,19 +62,55 @@ class ImapConn():
return 0
def get_new_email_cnt(self):
messages = self._reselect_folder()
messages = self.reselect_folder()
try:
return messages[0] - self.original_msg_count
except IndexError:
return 0
def _reselect_folder(self):
def reselect_folder(self):
status, messages = self.connection.select(self.foldername)
if status != "OK":
print("Incorrect mail box " + status + str(messages))
raise ConnectionError
print("(Re-)Selected mailbox: " + status + " " + str(messages))
# print("(Re-)Selected mailbox: " + status + " " + str(messages))
return messages
def __del__(self):
self.connection.shutdown()
# self.connection.close()
self.connection.logout()
def make_direct_imap(account, folder):
conn_info = (account.get_config("configured_mail_server"),
account.get_config("addr"), account.get_config("mail_pw"))
imap = ImapConn(folder, conn_info=conn_info)
return imap
def print_imap_structure(database):
ac = Account(database)
ac.disable_logging()
print("================= ACCOUNT", ac.get_config("addr"), "=================")
print("----------------- CONFIG: -----------------")
print(ac.get_info())
for folder in [INBOX, MVBOX, SENT]:
try:
print("-----------------", folder, "-----------------")
imap = make_direct_imap(ac, folder)
c = imap.connection
typ, data = c.search(None, 'ALL')
c._get_tagged_response
for num in data[0].split():
typ, data = c.fetch(num, '(RFC822)')
body = data[0][1]
typ, data = c.fetch(num, '(UID FLAGS)')
info = data[0]
path = pathlib.Path("./IMAP-MESSAGES-" + ac.get_config("addr") + "~/" + folder)
path.mkdir(parents=True, exist_ok=True)
file = path.joinpath(str(info).replace("b'", "").replace("'", "").replace("\\", ""))
file.write_bytes(body)
print("Message", info, "saved as", file)
except ConnectionError:
print("Seems like there is no", folder, "folder")

View File

@@ -377,12 +377,6 @@ def acfactory(pytestconfig, tmpdir, request, session_liveconfig, data):
self._finalizers.append(bot.kill)
return bot
def make_direct_imap(self, account, folder):
conn_info = (account.get_config("configured_mail_server"),
account.get_config("addr"), account.get_config("mail_pw"))
imap = ImapConn(folder, conn_info=conn_info)
return imap
am = AccountMaker()
request.addfinalizer(am.finalize)
return am

View File

@@ -8,6 +8,7 @@ from deltachat.message import Message
from deltachat.hookspec import account_hookimpl
from datetime import datetime, timedelta
from deltachat import direct_imap
from deltachat.direct_imap import make_direct_imap
@pytest.mark.parametrize("msgtext,res", [
@@ -638,15 +639,14 @@ class TestOnlineAccount:
def test_mark_read_on_server(self, acfactory, lp):
ac1 = acfactory.get_online_configuring_account()
ac2 = acfactory.get_online_configuring_account()
ac2.set_config("mvbox_move", "1")
ac2 = acfactory.get_online_configuring_account(mvbox=True, move=True)
ac1.wait_configure_finish()
ac1.start_io()
ac2.wait_configure_finish()
ac2.start_io()
imap2 = acfactory.make_direct_imap(ac2, direct_imap.MVBOX)
imap2 = make_direct_imap(ac2, direct_imap.MVBOX)
imap2.mark_all_read()
assert imap2.get_unread_cnt() == 0