diff --git a/src/download.rs b/src/download.rs index 1f67a460f..a95ec104b 100644 --- a/src/download.rs +++ b/src/download.rs @@ -213,17 +213,18 @@ impl Session { let mut uid_message_ids: BTreeMap = BTreeMap::new(); uid_message_ids.insert(uid, rfc724_mid); - let (last_uid, _received) = self - .fetch_many_msgs( - context, - folder, - uidvalidity, - vec![uid], - &uid_message_ids, - false, - ) - .await?; - if last_uid.is_none() { + let (sender, receiver) = async_channel::unbounded(); + self.fetch_many_msgs( + context, + folder, + uidvalidity, + vec![uid], + &uid_message_ids, + false, + sender, + ) + .await?; + if receiver.recv().await.is_err() { bail!("Failed to fetch UID {uid}"); } Ok(()) diff --git a/src/imap.rs b/src/imap.rs index 641d88a98..7b8de333e 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -14,7 +14,7 @@ use std::{ }; use anyhow::{Context as _, Result, bail, ensure, format_err}; -use async_channel::Receiver; +use async_channel::{self, Receiver, Sender}; use async_imap::types::{Fetch, Flag, Name, NameAttribute, UnsolicitedResponse}; use deltachat_contact_tools::ContactAddress; use futures::{FutureExt as _, StreamExt, TryStreamExt}; @@ -562,7 +562,7 @@ impl Imap { let read_cnt = msgs.len(); let download_limit = context.download_limit().await?; - let mut uids_fetch = Vec::<(_, bool /* partially? */)>::with_capacity(msgs.len() + 1); + let mut uids_fetch = Vec::<(u32, bool /* partially? */)>::with_capacity(msgs.len() + 1); let mut uid_message_ids = BTreeMap::new(); let mut largest_uid_skipped = None; let delete_target = context.get_delete_msgs_target().await?; @@ -695,51 +695,72 @@ impl Imap { self.connectivity.set_working(context).await; } - // Actually download messages. - let mut largest_uid_fetched: u32 = 0; - let mut received_msgs = Vec::with_capacity(uids_fetch.len()); - let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1)); - let mut fetch_partially = false; - uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1)); - for (uid, fp) in uids_fetch { - if fp != fetch_partially { - let (largest_uid_fetched_in_batch, received_msgs_in_batch) = session - .fetch_many_msgs( - context, - folder, - uid_validity, - uids_fetch_in_batch.split_off(0), - &uid_message_ids, - fetch_partially, - ) - .await - .context("fetch_many_msgs")?; - received_msgs.extend(received_msgs_in_batch); - largest_uid_fetched = max( - largest_uid_fetched, - largest_uid_fetched_in_batch.unwrap_or(0), - ); - fetch_partially = fp; - } - uids_fetch_in_batch.push(uid); - } + let (sender, receiver) = async_channel::unbounded(); - // Advance uid_next to the maximum of the largest known UID plus 1 - // and mailbox UIDNEXT. - // Largest known UID is normally less than UIDNEXT, - // but a message may have arrived between determining UIDNEXT - // and executing the FETCH command. + let mut received_msgs = Vec::with_capacity(uids_fetch.len()); let mailbox_uid_next = session .selected_mailbox .as_ref() .with_context(|| format!("Expected {folder:?} to be selected"))? .uid_next .unwrap_or_default(); - let new_uid_next = max( - max(largest_uid_fetched, largest_uid_skipped.unwrap_or(0)) + 1, - mailbox_uid_next, - ); + let update_uids_future = async { + let mut largest_uid_fetched: u32 = 0; + + while let Ok((uid, received_msg_opt)) = receiver.recv().await { + largest_uid_fetched = max(largest_uid_fetched, uid); + if let Some(received_msg) = received_msg_opt { + received_msgs.push(received_msg) + } + } + + largest_uid_fetched + }; + + let actually_download_messages_future = async move { + let mut uids_fetch_in_batch = Vec::with_capacity(max(uids_fetch.len(), 1)); + let mut fetch_partially = false; + uids_fetch.push((0, !uids_fetch.last().unwrap_or(&(0, false)).1)); + for (uid, fp) in uids_fetch { + if fp != fetch_partially { + session + .fetch_many_msgs( + context, + folder, + uid_validity, + uids_fetch_in_batch.split_off(0), + &uid_message_ids, + fetch_partially, + sender.clone(), + ) + .await + .context("fetch_many_msgs")?; + fetch_partially = fp; + } + uids_fetch_in_batch.push(uid); + } + + anyhow::Ok(()) + }; + + let (largest_uid_fetched, fetch_res) = + tokio::join!(update_uids_future, actually_download_messages_future); + + // Advance uid_next to the largest fetched UID plus 1. + // + // This may be larger than `mailbox_uid_next` + // if the message has arrived after selecting mailbox + // and determining its UIDNEXT and before prefetch. + let mut new_uid_next = largest_uid_fetched + 1; + if fetch_res.is_ok() { + // If we have successfully fetched all messages we planned during prefetch, + // then we have covered at least the range between old UIDNEXT + // and UIDNEXT of the mailbox at the time of selecting it. + new_uid_next = max(new_uid_next, mailbox_uid_next); + + new_uid_next = max(new_uid_next, largest_uid_skipped.unwrap_or(0) + 1); + } if new_uid_next > old_uid_next { set_uid_next(context, folder, new_uid_next).await?; } @@ -752,6 +773,10 @@ impl Imap { chat::mark_old_messages_as_noticed(context, received_msgs).await?; + // Now fail if fetching failed, so we will + // establish a new session if this one is broken. + fetch_res?; + Ok(read_cnt > 0) } @@ -1300,9 +1325,19 @@ impl Session { /// Fetches a list of messages by server UID. /// - /// Returns the last UID fetched successfully and the info about each downloaded message. + /// Sends pairs of UID and info about each downloaded message to the provided channel. + /// Received message info is optional because UID may be ignored + /// if the message has a `\Deleted` flag. + /// + /// The channel is used to return the results because the function may fail + /// due to network errors before it finishes fetching all the messages. + /// In this case caller still may want to process all the results + /// received over the channel and persist last seen UID in the database + /// before bubbling up the failure. + /// /// If the message is incorrect or there is a failure to write a message to the database, /// it is skipped and the error is logged. + #[expect(clippy::too_many_arguments)] pub(crate) async fn fetch_many_msgs( &mut self, context: &Context, @@ -1311,12 +1346,10 @@ impl Session { request_uids: Vec, uid_message_ids: &BTreeMap, fetch_partially: bool, - ) -> Result<(Option, Vec)> { - let mut last_uid = None; - let mut received_msgs = Vec::new(); - + received_msgs_channel: Sender<(u32, Option)>, + ) -> Result<()> { if request_uids.is_empty() { - return Ok((last_uid, received_msgs)); + return Ok(()); } for (request_uids, set) in build_sequence_sets(&request_uids)? { @@ -1402,7 +1435,7 @@ impl Session { if is_deleted { info!(context, "Not processing deleted msg {}.", request_uid); - last_uid = Some(request_uid); + received_msgs_channel.send((request_uid, None)).await?; continue; } @@ -1413,7 +1446,7 @@ impl Session { context, "Not processing message {} without a BODY.", request_uid ); - last_uid = Some(request_uid); + received_msgs_channel.send((request_uid, None)).await?; continue; }; @@ -1445,15 +1478,15 @@ impl Session { .await { Ok(received_msg) => { - if let Some(m) = received_msg { - received_msgs.push(m); - } + received_msgs_channel + .send((request_uid, received_msg)) + .await?; } Err(err) => { warn!(context, "receive_imf error: {:#}.", err); + received_msgs_channel.send((request_uid, None)).await?; } }; - last_uid = Some(request_uid) } // If we don't process the whole response, IMAP client is left in a broken state where @@ -1477,7 +1510,7 @@ impl Session { } } - Ok((last_uid, received_msgs)) + Ok(()) } /// Retrieves server metadata if it is supported.