feat: pre-messages / next version of download on demand (#7371)

Closes <https://github.com/chatmail/core/issues/7367>

Co-authored-by: iequidoo <dgreshilov@gmail.com>
Co-authored-by: Hocuri <hocuri@gmx.de>
This commit is contained in:
Simon Laux
2026-01-08 22:14:32 +00:00
committed by GitHub
parent 46bbe5f077
commit 2631745a57
43 changed files with 2843 additions and 1393 deletions

View File

@@ -67,7 +67,6 @@ const RFC724MID_UID: &str = "(UID BODY.PEEK[HEADER.FIELDS (\
X-MICROSOFT-ORIGINAL-MESSAGE-ID\
)])";
const BODY_FULL: &str = "(FLAGS BODY.PEEK[])";
const BODY_PARTIAL: &str = "(FLAGS RFC822.SIZE BODY.PEEK[HEADER])";
#[derive(Debug)]
pub(crate) struct Imap {
@@ -615,12 +614,18 @@ impl Imap {
.context("prefetch")?;
let read_cnt = msgs.len();
let download_limit = context.download_limit().await?;
let mut uids_fetch = Vec::<(u32, bool /* partially? */)>::with_capacity(msgs.len() + 1);
let mut uids_fetch: Vec<u32> = Vec::new();
let mut available_post_msgs: Vec<String> = Vec::new();
let mut download_later: Vec<String> = Vec::new();
let mut uid_message_ids = BTreeMap::new();
let mut largest_uid_skipped = None;
let delete_target = context.get_delete_msgs_target().await?;
let download_limit: Option<u32> = context
.get_config_parsed(Config::DownloadLimit)
.await?
.filter(|&l| 0 < l);
// Store the info about IMAP messages in the database.
for (uid, ref fetch_response) in msgs {
let headers = match get_fetch_headers(fetch_response) {
@@ -632,6 +637,9 @@ impl Imap {
};
let message_id = prefetch_get_message_id(&headers);
let size = fetch_response
.size
.context("imap fetch response does not contain size")?;
// Determine the target folder where the message should be moved to.
//
@@ -706,14 +714,27 @@ impl Imap {
)
.await.context("prefetch_should_download")?
{
match download_limit {
Some(download_limit) => uids_fetch.push((
uid,
fetch_response.size.unwrap_or_default() > download_limit,
)),
None => uids_fetch.push((uid, false)),
}
uid_message_ids.insert(uid, message_id);
if headers
.get_header_value(HeaderDef::ChatIsPostMessage)
.is_some()
{
info!(context, "{message_id:?} is a post-message.");
available_post_msgs.push(message_id.clone());
if download_limit.is_none_or(|download_limit| size <= download_limit) {
download_later.push(message_id.clone());
}
largest_uid_skipped = Some(uid);
} else {
info!(context, "{message_id:?} is not a post-message.");
if download_limit.is_none_or(|download_limit| size <= download_limit) {
uids_fetch.push(uid);
uid_message_ids.insert(uid, message_id);
} else {
download_later.push(message_id.clone());
largest_uid_skipped = Some(uid);
}
};
} else {
largest_uid_skipped = Some(uid);
}
@@ -747,29 +768,10 @@ impl Imap {
};
let actually_download_messages_future = async {
let sender = sender;
let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1));
let mut fetch_partially = false;
uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1));
for (uid, fp) in uids_fetch {
if fp != fetch_partially {
session
.fetch_many_msgs(
context,
folder,
uids_fetch_in_batch.split_off(0),
&uid_message_ids,
fetch_partially,
sender.clone(),
)
.await
.context("fetch_many_msgs")?;
fetch_partially = fp;
}
uids_fetch_in_batch.push(uid);
}
anyhow::Ok(())
session
.fetch_many_msgs(context, folder, uids_fetch, &uid_message_ids, sender)
.await
.context("fetch_many_msgs")
};
let (largest_uid_fetched, fetch_res) =
@@ -804,6 +806,30 @@ impl Imap {
chat::mark_old_messages_as_noticed(context, received_msgs).await?;
if fetch_res.is_ok() {
info!(
context,
"available_post_msgs: {}, download_later: {}.",
available_post_msgs.len(),
download_later.len(),
);
let trans_fn = |t: &mut rusqlite::Transaction| {
let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?;
for rfc724_mid in available_post_msgs {
stmt.execute((rfc724_mid,))
.context("INSERT OR IGNORE INTO available_post_msgs")?;
}
let mut stmt =
t.prepare("INSERT OR IGNORE INTO download (rfc724_mid, msg_id) VALUES (?,0)")?;
for rfc724_mid in download_later {
stmt.execute((rfc724_mid,))
.context("INSERT OR IGNORE INTO download")?;
}
Ok(())
};
context.sql.transaction(trans_fn).await?;
}
// Now fail if fetching failed, so we will
// establish a new session if this one is broken.
fetch_res?;
@@ -1339,7 +1365,6 @@ impl Session {
folder: &str,
request_uids: Vec<u32>,
uid_message_ids: &BTreeMap<u32, String>,
fetch_partially: bool,
received_msgs_channel: Sender<(u32, Option<ReceivedMsg>)>,
) -> Result<()> {
if request_uids.is_empty() {
@@ -1347,25 +1372,10 @@ impl Session {
}
for (request_uids, set) in build_sequence_sets(&request_uids)? {
info!(
context,
"Starting a {} FETCH of message set \"{}\".",
if fetch_partially { "partial" } else { "full" },
set
);
let mut fetch_responses = self
.uid_fetch(
&set,
if fetch_partially {
BODY_PARTIAL
} else {
BODY_FULL
},
)
.await
.with_context(|| {
format!("fetching messages {} from folder \"{}\"", &set, folder)
})?;
info!(context, "Starting UID FETCH of message set \"{}\".", set);
let mut fetch_responses = self.uid_fetch(&set, BODY_FULL).await.with_context(|| {
format!("fetching messages {} from folder \"{}\"", &set, folder)
})?;
// Map from UIDs to unprocessed FETCH results. We put unprocessed FETCH results here
// when we want to process other messages first.
@@ -1422,11 +1432,7 @@ impl Session {
count += 1;
let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
let (body, partial) = if fetch_partially {
(fetch_response.header(), fetch_response.size) // `BODY.PEEK[HEADER]` goes to header() ...
} else {
(fetch_response.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header()
};
let body = fetch_response.body();
if is_deleted {
info!(context, "Not processing deleted msg {}.", request_uid);
@@ -1460,7 +1466,7 @@ impl Session {
context,
"Passing message UID {} to receive_imf().", request_uid
);
let res = receive_imf_inner(context, rfc724_mid, body, is_seen, partial).await;
let res = receive_imf_inner(context, rfc724_mid, body, is_seen).await;
let received_msg = match res {
Err(err) => {
warn!(context, "receive_imf error: {err:#}.");
@@ -2219,11 +2225,12 @@ pub(crate) async fn prefetch_should_download(
message_id: &str,
mut flags: impl Iterator<Item = Flag<'_>>,
) -> Result<bool> {
if message::rfc724_mid_exists(context, message_id)
.await?
.is_some()
{
markseen_on_imap_table(context, message_id).await?;
if message::rfc724_mid_download_tried(context, message_id).await? {
if let Some(from) = mimeparser::get_from(headers)
&& context.is_self_addr(&from.addr).await?
{
markseen_on_imap_table(context, message_id).await?;
}
return Ok(false);
}