Merge pull request #5296 from deltachat/link2xt/imap-session

refactor: move more methods from Imap into Session
This commit is contained in:
link2xt
2024-02-29 02:29:52 +00:00
committed by GitHub
10 changed files with 366 additions and 381 deletions

View File

@@ -4,7 +4,6 @@
//! to implement connect, fetch, delete functionality with standard IMAP servers.
use std::{
cmp,
cmp::max,
collections::{BTreeMap, BTreeSet, HashMap},
iter::Peekable,
@@ -23,7 +22,7 @@ use tokio::sync::RwLock;
use crate::chat::{self, ChatId, ChatIdBlocked};
use crate::config::Config;
use crate::constants::{self, Blocked, Chattype, ShowEmails, DC_FETCH_EXISTING_MSGS_COUNT};
use crate::constants::{self, Blocked, Chattype, ShowEmails};
use crate::contact::{normalize_name, Contact, ContactAddress, ContactId, Modifier, Origin};
use crate::context::Context;
use crate::events::EventType;
@@ -53,8 +52,6 @@ use client::Client;
use mailparse::SingleInfo;
use session::Session;
use self::select_folder::NewlySelected;
pub(crate) const GENERATED_PREFIX: &str = "GEN_";
#[derive(Debug, Display, Clone, Copy, PartialEq, Eq)]
@@ -64,21 +61,6 @@ pub enum ImapActionResult {
Success,
}
/// Prefetch:
/// - Message-ID to check if we already have the message.
/// - In-Reply-To and References to check if message is a reply to chat message.
/// - Chat-Version to check if a message is a chat message
/// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message,
/// not necessarily sent by Delta Chat.
const PREFETCH_FLAGS: &str = "(UID INTERNALDATE RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
MESSAGE-ID \
DATE \
X-MICROSOFT-ORIGINAL-MESSAGE-ID \
FROM \
IN-REPLY-TO REFERENCES \
CHAT-VERSION \
AUTOCRYPT-SETUP-MESSAGE\
)])";
const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
MESSAGE-ID \
X-MICROSOFT-ORIGINAL-MESSAGE-ID\
@@ -537,211 +519,6 @@ impl Imap {
Ok(())
}
/// Synchronizes UIDs in the database with UIDs on the server.
///
/// It is assumed that no operations are taking place on the same
/// folder at the moment. Make sure to run it in the same
/// thread/task as other network operations on this folder to
/// avoid race conditions.
pub(crate) async fn resync_folder_uids(
&mut self,
context: &Context,
folder: &str,
folder_meaning: FolderMeaning,
) -> Result<()> {
// Collect pairs of UID and Message-ID.
let mut msgs = BTreeMap::new();
let session = self
.session
.as_mut()
.context("IMAP No connection established")?;
session.select_folder(context, Some(folder)).await?;
let mut list = session
.uid_fetch("1:*", RFC724MID_UID)
.await
.with_context(|| format!("can't resync folder {folder}"))?;
while let Some(fetch) = list.try_next().await? {
let headers = match get_fetch_headers(&fetch) {
Ok(headers) => headers,
Err(err) => {
warn!(context, "Failed to parse FETCH headers: {}", err);
continue;
}
};
let message_id = prefetch_get_message_id(&headers);
if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
msgs.insert(
uid,
(
rfc724_mid,
target_folder(context, folder, folder_meaning, &headers).await?,
),
);
}
}
info!(
context,
"Resync: collected {} message IDs in folder {}",
msgs.len(),
folder,
);
let uid_validity = get_uidvalidity(context, folder).await?;
// Write collected UIDs to SQLite database.
context
.sql
.transaction(move |transaction| {
transaction.execute("DELETE FROM imap WHERE folder=?", (folder,))?;
for (uid, (rfc724_mid, target)) in &msgs {
// This may detect previously undetected moved
// messages, so we update server_folder too.
transaction.execute(
"INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(folder, uid, uidvalidity)
DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
target=excluded.target",
(rfc724_mid, folder, uid, uid_validity, target),
)?;
}
Ok(())
})
.await?;
Ok(())
}
/// Selects a folder and takes care of UIDVALIDITY changes.
///
/// When selecting a folder for the first time, sets the uid_next to the current
/// mailbox.uid_next so that no old emails are fetched.
///
/// Returns Result<new_emails> (i.e. whether new emails arrived),
/// if in doubt, returns new_emails=true so emails are fetched.
pub(crate) async fn select_with_uidvalidity(
&mut self,
context: &Context,
folder: &str,
) -> Result<bool> {
let session = self.session.as_mut().context("no session")?;
let newly_selected = session
.select_or_create_folder(context, folder)
.await
.with_context(|| format!("failed to select or create folder {folder}"))?;
let mailbox = session
.selected_mailbox
.as_mut()
.with_context(|| format!("No mailbox selected, folder: {folder}"))?;
let old_uid_validity = get_uidvalidity(context, folder)
.await
.with_context(|| format!("failed to get old UID validity for folder {folder}"))?;
let old_uid_next = get_uid_next(context, folder)
.await
.with_context(|| format!("failed to get old UID NEXT for folder {folder}"))?;
let new_uid_validity = mailbox
.uid_validity
.with_context(|| format!("No UIDVALIDITY for folder {folder}"))?;
let new_uid_next = if let Some(uid_next) = mailbox.uid_next {
Some(uid_next)
} else {
warn!(
context,
"SELECT response for IMAP folder {folder:?} has no UIDNEXT, fall back to STATUS command."
);
// RFC 3501 says STATUS command SHOULD NOT be used
// on the currently selected mailbox because the same
// information can be obtained by other means,
// such as reading SELECT response.
//
// However, it also says that UIDNEXT is REQUIRED
// in the SELECT response and if we are here,
// it is actually not returned.
//
// In particular, Winmail Pro Mail Server 5.1.0616
// never returns UIDNEXT in SELECT response,
// but responds to "STATUS INBOX (UIDNEXT)" command.
let status = session
.inner
.status(folder, "(UIDNEXT)")
.await
.with_context(|| format!("STATUS (UIDNEXT) error for {folder:?}"))?;
if status.uid_next.is_none() {
// This happens with mail.163.com as of 2023-11-26.
// It does not return UIDNEXT on SELECT and returns invalid
// `* STATUS "INBOX" ()` response on explicit request for UIDNEXT.
warn!(context, "STATUS {folder} (UIDNEXT) did not return UIDNEXT.");
}
status.uid_next
};
mailbox.uid_next = new_uid_next;
if new_uid_validity == old_uid_validity {
let new_emails = if newly_selected == NewlySelected::No {
// The folder was not newly selected i.e. no SELECT command was run. This means that mailbox.uid_next
// was not updated and may contain an incorrect value. So, just return true so that
// the caller tries to fetch new messages (we could of course run a SELECT command now, but trying to fetch
// new messages is only one command, just as a SELECT command)
true
} else if let Some(new_uid_next) = new_uid_next {
if new_uid_next < old_uid_next {
warn!(
context,
"The server illegally decreased the uid_next of folder {folder:?} from {old_uid_next} to {new_uid_next} without changing validity ({new_uid_validity}), resyncing UIDs...",
);
set_uid_next(context, folder, new_uid_next).await?;
context.schedule_resync().await?;
}
new_uid_next != old_uid_next // If UIDNEXT changed, there are new emails
} else {
// We have no UIDNEXT and if in doubt, return true.
true
};
return Ok(new_emails);
}
// UIDVALIDITY is modified, reset highest seen MODSEQ.
set_modseq(context, folder, 0).await?;
// ============== uid_validity has changed or is being set the first time. ==============
let new_uid_next = new_uid_next.unwrap_or_default();
set_uid_next(context, folder, new_uid_next).await?;
set_uidvalidity(context, folder, new_uid_validity).await?;
// Collect garbage entries in `imap` table.
context
.sql
.execute(
"DELETE FROM imap WHERE folder=? AND uidvalidity!=?",
(&folder, new_uid_validity),
)
.await?;
if old_uid_validity != 0 || old_uid_next != 0 {
context.schedule_resync().await?;
}
info!(
context,
"uid/validity change folder {}: new {}/{} previous {}/{}.",
folder,
new_uid_next,
new_uid_validity,
old_uid_next,
old_uid_validity,
);
Ok(false)
}
/// Fetches new messages.
///
/// Returns true if at least one message was fetched.
@@ -757,7 +534,8 @@ impl Imap {
return Ok(false);
}
let new_emails = self
let session = self.session.as_mut().context("No IMAP session")?;
let new_emails = session
.select_with_uidvalidity(context, folder)
.await
.with_context(|| format!("Failed to select folder {folder:?}"))?;
@@ -771,11 +549,12 @@ impl Imap {
let old_uid_next = get_uid_next(context, folder).await?;
let msgs = if fetch_existing_msgs {
self.prefetch_existing_msgs()
session
.prefetch_existing_msgs()
.await
.context("prefetch_existing_msgs")?
} else {
self.prefetch(old_uid_next).await.context("prefetch")?
session.prefetch(old_uid_next).await.context("prefetch")?
};
let read_cnt = msgs.len();
@@ -916,6 +695,9 @@ impl Imap {
for (uid, fp) in uids_fetch {
if fp != fetch_partially {
let (largest_uid_fetched_in_batch, received_msgs_in_batch) = self
.session
.as_mut()
.context("No IMAP session")?
.fetch_many_msgs(
context,
folder,
@@ -986,13 +768,14 @@ impl Imap {
}
self.prepare(context).await.context("could not connect")?;
add_all_recipients_as_contacts(context, self, Config::ConfiguredSentboxFolder)
let session = self.session.as_mut().context("No IMAP session")?;
add_all_recipients_as_contacts(context, session, Config::ConfiguredSentboxFolder)
.await
.context("failed to get recipients from the sentbox")?;
add_all_recipients_as_contacts(context, self, Config::ConfiguredMvboxFolder)
add_all_recipients_as_contacts(context, session, Config::ConfiguredMvboxFolder)
.await
.context("failed to get recipients from the movebox")?;
add_all_recipients_as_contacts(context, self, Config::ConfiguredInboxFolder)
add_all_recipients_as_contacts(context, session, Config::ConfiguredInboxFolder)
.await
.context("failed to get recipients from the inbox")?;
@@ -1021,11 +804,11 @@ impl Imap {
info!(context, "Done fetching existing messages.");
Ok(())
}
}
impl Session {
/// Synchronizes UIDs for all folders.
pub(crate) async fn resync_folders(&mut self, context: &Context) -> Result<()> {
self.prepare(context).await?;
let all_folders = self
.list_folders()
.await
@@ -1039,9 +822,81 @@ impl Imap {
}
Ok(())
}
}
impl Session {
/// Synchronizes UIDs in the database with UIDs on the server.
///
/// It is assumed that no operations are taking place on the same
/// folder at the moment. Make sure to run it in the same
/// thread/task as other network operations on this folder to
/// avoid race conditions.
pub(crate) async fn resync_folder_uids(
&mut self,
context: &Context,
folder: &str,
folder_meaning: FolderMeaning,
) -> Result<()> {
// Collect pairs of UID and Message-ID.
let mut msgs = BTreeMap::new();
self.select_folder(context, Some(folder)).await?;
let mut list = self
.uid_fetch("1:*", RFC724MID_UID)
.await
.with_context(|| format!("can't resync folder {folder}"))?;
while let Some(fetch) = list.try_next().await? {
let headers = match get_fetch_headers(&fetch) {
Ok(headers) => headers,
Err(err) => {
warn!(context, "Failed to parse FETCH headers: {}", err);
continue;
}
};
let message_id = prefetch_get_message_id(&headers);
if let (Some(uid), Some(rfc724_mid)) = (fetch.uid, message_id) {
msgs.insert(
uid,
(
rfc724_mid,
target_folder(context, folder, folder_meaning, &headers).await?,
),
);
}
}
info!(
context,
"Resync: collected {} message IDs in folder {}",
msgs.len(),
folder,
);
let uid_validity = get_uidvalidity(context, folder).await?;
// Write collected UIDs to SQLite database.
context
.sql
.transaction(move |transaction| {
transaction.execute("DELETE FROM imap WHERE folder=?", (folder,))?;
for (uid, (rfc724_mid, target)) in &msgs {
// This may detect previously undetected moved
// messages, so we update server_folder too.
transaction.execute(
"INSERT INTO imap (rfc724_mid, folder, uid, uidvalidity, target)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(folder, uid, uidvalidity)
DO UPDATE SET rfc724_mid=excluded.rfc724_mid,
target=excluded.target",
(rfc724_mid, folder, uid, uid_validity, target),
)?;
}
Ok(())
})
.await?;
Ok(())
}
/// Deletes batch of messages identified by their UID from the currently
/// selected folder.
async fn delete_message_batch(
@@ -1265,17 +1120,10 @@ impl Session {
Ok(())
}
}
impl Imap {
/// Synchronizes `\Seen` flags using `CONDSTORE` extension.
pub(crate) async fn sync_seen_flags(&mut self, context: &Context, folder: &str) -> Result<()> {
let session = self
.session
.as_mut()
.with_context(|| format!("No IMAP connection established, folder: {folder}"))?;
if !session.can_condstore() {
if !self.can_condstore() {
info!(
context,
"Server does not support CONDSTORE, skipping flag synchronization."
@@ -1283,12 +1131,11 @@ impl Imap {
return Ok(());
}
session
.select_folder(context, Some(folder))
self.select_folder(context, Some(folder))
.await
.context("failed to select folder")?;
let mailbox = session
let mailbox = self
.selected_mailbox
.as_ref()
.with_context(|| format!("No mailbox selected, folder: {folder}"))?;
@@ -1310,7 +1157,7 @@ impl Imap {
let mut highest_modseq = get_modseq(context, folder)
.await
.with_context(|| format!("failed to get MODSEQ for folder {folder}"))?;
let mut list = session
let mut list = self
.uid_fetch("1:*", format!("(FLAGS) (CHANGEDSINCE {highest_modseq})"))
.await
.context("failed to fetch flags")?;
@@ -1359,12 +1206,7 @@ impl Imap {
/// Gets the from, to and bcc addresses from all existing outgoing emails.
pub async fn get_all_recipients(&mut self, context: &Context) -> Result<Vec<SingleInfo>> {
let session = self
.session
.as_mut()
.context("IMAP No Connection established")?;
let mut uids: Vec<_> = session
let mut uids: Vec<_> = self
.uid_search(get_imap_self_sent_search_command(context).await?)
.await?
.into_iter()
@@ -1373,7 +1215,7 @@ impl Imap {
let mut result = Vec::new();
for (_, uid_set) in build_sequence_sets(&uids)? {
let mut list = session
let mut list = self
.uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])")
.await
.context("IMAP Could not fetch")?;
@@ -1397,68 +1239,6 @@ impl Imap {
Ok(result)
}
/// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results
/// in the order of ascending delivery time to the server (INTERNALDATE).
async fn prefetch(&mut self, uid_next: u32) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
let session = self
.session
.as_mut()
.context("no IMAP connection established")?;
// fetch messages with larger UID than the last one seen
let set = format!("{uid_next}:*");
let mut list = session
.uid_fetch(set, PREFETCH_FLAGS)
.await
.context("IMAP could not fetch")?;
let mut msgs = BTreeMap::new();
while let Some(msg) = list.try_next().await? {
if let Some(msg_uid) = msg.uid {
// If the mailbox is not empty, results always include
// at least one UID, even if last_seen_uid+1 is past
// the last UID in the mailbox. It happens because
// uid:* is interpreted the same way as *:uid.
// See <https://tools.ietf.org/html/rfc3501#page-61> for
// standard reference. Therefore, sometimes we receive
// already seen messages and have to filter them out.
if msg_uid >= uid_next {
msgs.insert((msg.internal_date(), msg_uid), msg);
}
}
}
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
}
/// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages)
async fn prefetch_existing_msgs(&mut self) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
let session = self.session.as_mut().context("no IMAP session")?;
let exists: i64 = {
let mailbox = session.selected_mailbox.as_ref().context("no mailbox")?;
mailbox.exists.into()
};
// Fetch last DC_FETCH_EXISTING_MSGS_COUNT (100) messages.
// Sequence numbers are sequential. If there are 1000 messages in the inbox,
// we can fetch the sequence numbers 900-1000 and get the last 100 messages.
let first = cmp::max(1, exists - DC_FETCH_EXISTING_MSGS_COUNT + 1);
let set = format!("{first}:{exists}");
let mut list = session
.fetch(&set, PREFETCH_FLAGS)
.await
.context("IMAP Could not fetch")?;
let mut msgs = BTreeMap::new();
while let Some(msg) = list.try_next().await? {
if let Some(msg_uid) = msg.uid {
msgs.insert((msg.internal_date(), msg_uid), msg);
}
}
Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
}
/// Fetches a list of messages by server UID.
///
/// Returns the last UID fetched successfully and the info about each downloaded message.
@@ -1482,7 +1262,6 @@ impl Imap {
return Ok((last_uid, received_msgs));
}
let session = self.session.as_mut().context("no IMAP session")?;
for (request_uids, set) in build_sequence_sets(&request_uids)? {
info!(
context,
@@ -1490,7 +1269,7 @@ impl Imap {
if fetch_partially { "partial" } else { "full" },
set
);
let mut fetch_responses = session
let mut fetch_responses = self
.uid_fetch(
&set,
if fetch_partially {
@@ -1656,8 +1435,7 @@ impl Imap {
/// and [`/shared/admin`](https://www.rfc-editor.org/rfc/rfc5464#section-6.2.2)
/// metadata.
pub(crate) async fn fetch_metadata(&mut self, context: &Context) -> Result<()> {
let session = self.session.as_mut().context("no session")?;
if !session.can_metadata() {
if !self.can_metadata() {
return Ok(());
}
@@ -1676,7 +1454,7 @@ impl Imap {
let mailbox = "";
let options = "";
let metadata = session
let metadata = self
.get_metadata(mailbox, options, "(/shared/comment /shared/admin)")
.await?;
for m in metadata {
@@ -1814,7 +1592,7 @@ impl Imap {
session.close().await?;
// Before moving emails to the mvbox we need to remember its UIDVALIDITY, otherwise
// emails moved before that wouldn't be fetched but considered "old" instead.
self.select_with_uidvalidity(context, folder).await?;
session.select_with_uidvalidity(context, folder).await?;
return Ok(Some(folder));
}
}
@@ -1825,7 +1603,7 @@ impl Imap {
let Some(folder) = folders.first() else {
return Ok(None);
};
match self.select_with_uidvalidity(context, folder).await {
match session.select_with_uidvalidity(context, folder).await {
Ok(_) => {
info!(context, "MVBOX-folder {} created.", folder);
return Ok(Some(folder));
@@ -2633,7 +2411,7 @@ impl std::fmt::Display for UidRange {
}
async fn add_all_recipients_as_contacts(
context: &Context,
imap: &mut Imap,
session: &mut Session,
folder: Config,
) -> Result<()> {
let mailbox = if let Some(m) = context.get_config(folder).await? {
@@ -2645,11 +2423,12 @@ async fn add_all_recipients_as_contacts(
);
return Ok(());
};
imap.select_with_uidvalidity(context, &mailbox)
session
.select_with_uidvalidity(context, &mailbox)
.await
.with_context(|| format!("could not select {mailbox}"))?;
let recipients = imap
let recipients = session
.get_all_recipients(context)
.await
.context("could not get recipients")?;