From 72d4da00950ec76665c8acb453d8c82413c18a51 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 24 May 2020 00:06:39 +0200 Subject: [PATCH] feat(imap): process incoming messages in bulk --- src/imap/mod.rs | 233 +++++++++++++++++++++++++++++++----------------- 1 file changed, 151 insertions(+), 82 deletions(-) diff --git a/src/imap/mod.rs b/src/imap/mod.rs index d0e98cb36..f235a4f21 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -578,10 +578,6 @@ impl Imap { .await?; let mut read_cnt: usize = 0; - let mut read_errors = 0; - - // prefetch info from all unfetched mails - let mut new_last_seen_uid = last_seen_uid; if self.session.is_none() { return Err(Error::NoConnection); @@ -606,84 +602,109 @@ impl Imap { drop(list); msgs.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default()); + let msgs: Vec<_> = msgs + .into_iter() + .filter(|msg| { + let cur_uid = msg.uid.unwrap_or_default(); + if cur_uid <= last_seen_uid { + // If the mailbox is not empty, results always include + // at least one UID, even if last_seen_uid+1 is past + // the last UID in the mailbox. It happens because + // uid+1:* is interpreted the same way as *:uid+1. + // See https://tools.ietf.org/html/rfc3501#page-61 for + // standard reference. Therefore, sometimes we receive + // already seen messages and have to filter them out. + info!( + context, + "fetch_new_messages: ignoring uid {}, last seen was {}", + cur_uid, + last_seen_uid + ); + false + } else { + true + } + }) + .collect(); + + read_cnt += msgs.len(); + + let mut read_errors = 0; + + let mut uids = Vec::with_capacity(msgs.len()); + let mut new_last_seen_uid = None; for fetch in msgs.into_iter() { + let folder: &str = folder.as_ref(); + let cur_uid = fetch.uid.unwrap_or_default(); - if cur_uid <= last_seen_uid { - // If the mailbox is not empty, results always include - // at least one UID, even if last_seen_uid+1 is past - // the last UID in the mailbox. It happens because - // uid+1:* is interpreted the same way as *:uid+1. - // See https://tools.ietf.org/html/rfc3501#page-61 for - // standard reference. Therefore, sometimes we receive - // already seen messages and have to filter them out. - info!( - context, - "fetch_new_messages: ignoring uid {}, last seen was {}", cur_uid, last_seen_uid - ); - continue; - } - read_cnt += 1; - let headers = get_fetch_headers(&fetch)?; + let headers = match get_fetch_headers(&fetch) { + Ok(h) => h, + Err(err) => { + warn!(context, "get_fetch_headers error: {}", err); + read_errors += 1; + continue; + } + }; + let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); - if let Ok(true) = precheck_imf(context, &message_id, folder.as_ref(), cur_uid) - .await - .map_err(|err| { + let skip = match precheck_imf(context, &message_id, folder, cur_uid).await { + Ok(skip) => skip, + Err(err) => { warn!(context, "precheck_imf error: {}", err); - err - }) - { + true + } + }; + + if skip { // we know the message-id already or don't want the message otherwise. info!( context, - "Skipping message {} from \"{}\" by precheck.", - message_id, - folder.as_ref(), + "Skipping message {} from \"{}\" by precheck.", message_id, folder, ); + if read_errors == 0 { + new_last_seen_uid = Some(cur_uid); + } } else { // we do not know the message-id // or the message-id is missing (in this case, we create one in the further process) // or some other error happened - let show = prefetch_should_download(context, &headers, show_emails) - .await - .map_err(|err| { + let show = match prefetch_should_download(context, &headers, show_emails).await { + Ok(show) => show, + Err(err) => { warn!(context, "prefetch_should_download error: {}", err); - err - }) - .unwrap_or(true); + true + } + }; - if !show { + if show { + uids.push(cur_uid); + } else { info!( context, - "Ignoring new message {} from \"{}\".", - message_id, - folder.as_ref(), + "Ignoring new message {} from \"{}\".", message_id, folder, ); - } else { - // check passed, go fetch the rest - if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await { - info!( - context, - "Read error for message {} from \"{}\", trying over later: {}.", - message_id, - folder.as_ref(), - err - ); - read_errors += 1; - } } - } - - if read_errors == 0 { - new_last_seen_uid = cur_uid; + if read_errors == 0 { + new_last_seen_uid = Some(cur_uid); + } } } - if new_last_seen_uid > last_seen_uid { + // check passed, go fetch the emails + let (new_last_seen_uid_processed, error_cnt) = + self.fetch_many_msgs(context, &folder, &uids).await; + + let new_last_seen_uid_processed = new_last_seen_uid_processed.unwrap_or_default(); + let new_last_seen_uid = new_last_seen_uid.unwrap_or_default(); + let last_one = new_last_seen_uid.max(new_last_seen_uid_processed); + if last_one > last_seen_uid { self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid) .await; } + read_errors += error_cnt; + if read_errors > 0 { warn!( context, @@ -721,25 +742,36 @@ impl Imap { .ok(); } - /// Fetches a single message by server UID. + /// Fetches a list of messages by server UID. + /// The passed in list of uids must be sorted. /// - /// If it succeeds, the message should be treated as received even - /// if no database entries are created. If the function returns an - /// error, the caller should try again later. - async fn fetch_single_msg>( + /// Returns the last uid fetch successfully and an error count. + async fn fetch_many_msgs>( &mut self, context: &Context, folder: S, - server_uid: u32, - ) -> Result<()> { - if !self.is_connected() { - return Err(Error::Other("Not connected".to_string())); + server_uids: &[u32], + ) -> (Option, usize) { + if server_uids.is_empty() { + return (None, 0); } - let set = format!("{}", server_uid); + let set = if server_uids.len() == 1 { + server_uids[0].to_string() + } else { + let first_uid = server_uids[0]; + let last_uid = server_uids[server_uids.len() - 1]; + assert!(first_uid < last_uid, "uids must be sorted"); + format!("{}:{}", first_uid, last_uid) + }; + + if !self.is_connected() { + warn!(context, "Not connected"); + return (None, server_uids.len()); + } let mut msgs = if let Some(ref mut session) = &mut self.session { - match session.uid_fetch(set, BODY_FLAGS).await { + match session.uid_fetch(&set, BODY_FLAGS).await { Ok(msgs) => msgs, Err(err) => { // TODO maybe differentiate between IO and input/parsing problems @@ -747,43 +779,80 @@ impl Imap { self.should_reconnect = true; warn!( context, - "Error on fetching message #{} from folder \"{}\"; error={}.", - server_uid, + "Error on fetching messages #{} from folder \"{}\"; error={}.", + &set, folder.as_ref(), err ); - return Err(Error::FetchFailed(err)); + return (None, server_uids.len()); } } } else { // we could not get a valid imap session, this should be retried self.trigger_reconnect(); - return Err(Error::Other("Could not get IMAP session".to_string())); + warn!(context, "Could not get IMAP session"); + return (None, server_uids.len()); }; - if let Some(Ok(msg)) = msgs.next().await { + let mut read_errors = 0; + let mut last_uid = None; + let mut count = 0; + + let mut jobs = Vec::with_capacity(server_uids.len()); + + while let Some(Ok(msg)) = msgs.next().await { + let server_uid = msg.uid.unwrap_or_default(); + if !server_uids.contains(&server_uid) { + // skip if there are some in between we are not interested in + continue; + } + count += 1; + // XXX put flags into a set and pass them to dc_receive_imf let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); let is_seen = msg.flags().any(|flag| flag == Flag::Seen); if !is_deleted && msg.body().is_some() { - let body = msg.body().unwrap_or_default(); - if let Err(err) = - dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen).await - { - return Err(Error::Other(format!("dc_receive_imf error: {}", err))); + let folder = folder.as_ref().to_string(); + let context = context.clone(); + let task = async_std::task::spawn(async move { + let body = msg.body().unwrap_or_default(); + + if let Err(err) = + dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await + { + warn!(context, "dc_receive_imf error: {}", err); + read_errors += 1; + None + } else { + Some(server_uid) + } + }); + jobs.push(task); + } + } + + for task in futures::future::join_all(jobs).await { + match task { + Some(uid) => { + last_uid = Some(uid); + } + None => { + read_errors += 1; } } - } else { + } + + if count != server_uids.len() { warn!( context, - "Message #{} does not exist in folder \"{}\".", - server_uid, - folder.as_ref() + "failed to fetch all uids: got {}, requested {}", + count, + server_uids.len() ); } - Ok(()) + (last_uid, read_errors) } pub async fn can_move(&self) -> bool {