fix: Prefetch messages in limited batches (#6915)

I have logs from a user where messages are prefetched for long minutes, and while it's not a problem
on its own, we can't rely that the connection overlives such a period, so make
`fetch_new_messages()` prefetch (and then actually download) messages in batches of 500 messages.
This commit is contained in:
iequidoo
2025-07-22 11:41:52 -03:00
committed by iequidoo
parent b9068b95b8
commit 481f5cae22
2 changed files with 44 additions and 19 deletions

View File

@@ -566,10 +566,38 @@ impl Imap {
} }
session.new_mail = false; session.new_mail = false;
let mut read_cnt = 0;
loop {
let (n, fetch_more) = self
.fetch_new_msg_batch(context, session, folder, folder_meaning)
.await?;
read_cnt += n;
if !fetch_more {
return Ok(read_cnt > 0);
}
}
}
/// Returns number of messages processed and whether the function should be called again.
async fn fetch_new_msg_batch(
&mut self,
context: &Context,
session: &mut Session,
folder: &str,
folder_meaning: FolderMeaning,
) -> Result<(usize, bool)> {
let uid_validity = get_uidvalidity(context, folder).await?; let uid_validity = get_uidvalidity(context, folder).await?;
let old_uid_next = get_uid_next(context, folder).await?; let old_uid_next = get_uid_next(context, folder).await?;
info!(
context,
"fetch_new_msg_batch({folder}): UIDVALIDITY={uid_validity}, UIDNEXT={old_uid_next}."
);
let msgs = session.prefetch(old_uid_next).await.context("prefetch")?; let uids_to_prefetch = 500;
let msgs = session
.prefetch(old_uid_next, uids_to_prefetch)
.await
.context("prefetch")?;
let read_cnt = msgs.len(); let read_cnt = msgs.len();
let download_limit = context.download_limit().await?; let download_limit = context.download_limit().await?;
@@ -729,7 +757,8 @@ impl Imap {
largest_uid_fetched largest_uid_fetched
}; };
let actually_download_messages_future = async move { let actually_download_messages_future = async {
let sender = sender;
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1)); let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
let mut fetch_partially = false; let mut fetch_partially = false;
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1)); uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
@@ -764,14 +793,17 @@ impl Imap {
// if the message has arrived after selecting mailbox // if the message has arrived after selecting mailbox
// and determining its UIDNEXT and before prefetch. // and determining its UIDNEXT and before prefetch.
let mut new_uid_next = largest_uid_fetched + 1; let mut new_uid_next = largest_uid_fetched + 1;
if fetch_res.is_ok() { let fetch_more = fetch_res.is_ok() && {
let prefetch_uid_next = old_uid_next + uids_to_prefetch;
// If we have successfully fetched all messages we planned during prefetch, // If we have successfully fetched all messages we planned during prefetch,
// then we have covered at least the range between old UIDNEXT // then we have covered at least the range between old UIDNEXT
// and UIDNEXT of the mailbox at the time of selecting it. // and UIDNEXT of the mailbox at the time of selecting it.
new_uid_next = max(new_uid_next, mailbox_uid_next); new_uid_next = max(new_uid_next, min(prefetch_uid_next, mailbox_uid_next));
new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1); new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1);
}
prefetch_uid_next < mailbox_uid_next
};
if new_uid_next > old_uid_next { if new_uid_next > old_uid_next {
set_uid_next(context, folder, new_uid_next).await?; set_uid_next(context, folder, new_uid_next).await?;
} }
@@ -788,7 +820,7 @@ impl Imap {
// establish a new session if this one is broken. // establish a new session if this one is broken.
fetch_res?; fetch_res?;
Ok(read_cnt > 0) Ok((read_cnt, fetch_more))
} }
/// Read the recipients from old emails sent by the user and add them as contacts. /// Read the recipients from old emails sent by the user and add them as contacts.

View File

@@ -110,14 +110,16 @@ impl Session {
Ok(list) Ok(list)
} }
/// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results /// Prefetch `n_uids` messages starting from `uid_next`. Returns a list of fetch results in the
/// in the order of ascending delivery time to the server (INTERNALDATE). /// order of ascending delivery time to the server (INTERNALDATE).
pub(crate) async fn prefetch( pub(crate) async fn prefetch(
&mut self, &mut self,
uid_next: u32, uid_next: u32,
n_uids: u32,
) -> Result<Vec<(u32, async_imap::types::Fetch)>> { ) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
let uid_last = uid_next.saturating_add(n_uids - 1);
// fetch messages with larger UID than the last one seen // fetch messages with larger UID than the last one seen
let set = format!("{uid_next}:*"); let set = format!("{uid_next}:{uid_last}");
let mut list = self let mut list = self
.uid_fetch(set, PREFETCH_FLAGS) .uid_fetch(set, PREFETCH_FLAGS)
.await .await
@@ -126,16 +128,7 @@ impl Session {
let mut msgs = BTreeMap::new(); let mut msgs = BTreeMap::new();
while let Some(msg) = list.try_next().await? { while let Some(msg) = list.try_next().await? {
if let Some(msg_uid) = msg.uid { if let Some(msg_uid) = msg.uid {
// If the mailbox is not empty, results always include msgs.insert((msg.internal_date(), msg_uid), msg);
// at least one UID, even if last_seen_uid+1 is past
// the last UID in the mailbox. It happens because
// uid:* is interpreted the same way as *:uid.
// See <https://tools.ietf.org/html/rfc3501#page-61> for
// standard reference. Therefore, sometimes we receive
// already seen messages and have to filter them out.
if msg_uid >= uid_next {
msgs.insert((msg.internal_date(), msg_uid), msg);
}
} }
} }