diff --git a/src/imap/mod.rs b/src/imap/mod.rs index f235a4f21..6032fb873 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -3,7 +3,7 @@ //! uses [async-email/async-imap](https://github.com/async-email/async-imap) //! to implement connect, fetch, delete functionality with standard IMAP servers. -use num_traits::FromPrimitive; +use std::collections::BTreeMap; use async_imap::{ error::Result as ImapResult, @@ -11,6 +11,7 @@ use async_imap::{ }; use async_std::prelude::*; use async_std::sync::Receiver; +use num_traits::FromPrimitive; use crate::config::*; use crate::constants::*; @@ -577,154 +578,120 @@ impl Imap { .select_with_uidvalidity(context, folder.as_ref()) .await?; - let mut read_cnt: usize = 0; - - if self.session.is_none() { - return Err(Error::NoConnection); - } - let session = self.session.as_mut().unwrap(); - - // fetch messages with larger UID than the last one seen - // `(UID FETCH lastseenuid+1:*)`, see RFC 4549 - let set = format!("{}:*", last_seen_uid + 1); - let mut list = match session.uid_fetch(set, PREFETCH_FLAGS).await { - Ok(list) => list, - Err(err) => { - return Err(Error::FetchFailed(err)); - } - }; - - let mut msgs = Vec::new(); - while let Some(fetch) = list.next().await { - let fetch = fetch.map_err(|err| Error::Other(err.to_string()))?; - msgs.push(fetch); - } - drop(list); - - msgs.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default()); - let msgs: Vec<_> = msgs - .into_iter() - .filter(|msg| { - let cur_uid = msg.uid.unwrap_or_default(); - if cur_uid <= last_seen_uid { - // If the mailbox is not empty, results always include - // at least one UID, even if last_seen_uid+1 is past - // the last UID in the mailbox. It happens because - // uid+1:* is interpreted the same way as *:uid+1. - // See https://tools.ietf.org/html/rfc3501#page-61 for - // standard reference. Therefore, sometimes we receive - // already seen messages and have to filter them out. - info!( - context, - "fetch_new_messages: ignoring uid {}, last seen was {}", - cur_uid, - last_seen_uid - ); - false - } else { - true - } - }) - .collect(); - - read_cnt += msgs.len(); + let msgs = self.fetch_after(context, last_seen_uid).await?; + let read_cnt = msgs.len(); + let folder: &str = folder.as_ref(); let mut read_errors = 0; - let mut uids = Vec::with_capacity(msgs.len()); let mut new_last_seen_uid = None; - for fetch in msgs.into_iter() { - let folder: &str = folder.as_ref(); - - let cur_uid = fetch.uid.unwrap_or_default(); - let headers = match get_fetch_headers(&fetch) { - Ok(h) => h, + for (current_uid, msg) in msgs.into_iter() { + let (headers, msg_id) = match get_fetch_headers(&msg) { + Ok(headers) => { + let msg_id = prefetch_get_message_id(&headers).unwrap_or_default(); + (headers, msg_id) + } Err(err) => { - warn!(context, "get_fetch_headers error: {}", err); + warn!(context, "{}", err); read_errors += 1; continue; } }; - let message_id = prefetch_get_message_id(&headers).unwrap_or_default(); - let skip = match precheck_imf(context, &message_id, folder, cur_uid).await { - Ok(skip) => skip, - Err(err) => { - warn!(context, "precheck_imf error: {}", err); - true - } - }; - - if skip { - // we know the message-id already or don't want the message otherwise. - info!( - context, - "Skipping message {} from \"{}\" by precheck.", message_id, folder, - ); - if read_errors == 0 { - new_last_seen_uid = Some(cur_uid); - } - } else { - // we do not know the message-id - // or the message-id is missing (in this case, we create one in the further process) - // or some other error happened - let show = match prefetch_should_download(context, &headers, show_emails).await { - Ok(show) => show, - Err(err) => { - warn!(context, "prefetch_should_download error: {}", err); - true - } - }; - - if show { - uids.push(cur_uid); - } else { - info!( - context, - "Ignoring new message {} from \"{}\".", message_id, folder, - ); - } - if read_errors == 0 { - new_last_seen_uid = Some(cur_uid); - } + if message_needs_processing( + context, + current_uid, + &headers, + &msg_id, + folder, + show_emails, + ) + .await + { + // Trigger download and processing for this message. + uids.push(current_uid); + } else if read_errors == 0 { + // No errors so far, but this was skipped, so mark as last_seen_uid + new_last_seen_uid = Some(current_uid); } } // check passed, go fetch the emails let (new_last_seen_uid_processed, error_cnt) = self.fetch_many_msgs(context, &folder, &uids).await; + read_errors += error_cnt; + // determine which last_seen_uid to use to update to let new_last_seen_uid_processed = new_last_seen_uid_processed.unwrap_or_default(); let new_last_seen_uid = new_last_seen_uid.unwrap_or_default(); let last_one = new_last_seen_uid.max(new_last_seen_uid_processed); + if last_one > last_seen_uid { - self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid) + self.set_config_last_seen_uid(context, &folder, uid_validity, last_one) .await; } - read_errors += error_cnt; - - if read_errors > 0 { + if read_errors == 0 { + info!(context, "{} mails read from \"{}\".", read_cnt, folder,); + } else { warn!( context, - "{} mails read from \"{}\" with {} errors.", - read_cnt, - folder.as_ref(), - read_errors - ); - } else { - info!( - context, - "{} mails read from \"{}\".", - read_cnt, - folder.as_ref() + "{} mails read from \"{}\" with {} errors.", read_cnt, folder, read_errors ); } Ok(read_cnt > 0) } + /// Fetch all uids larger than the passed in. Returns a sorted list of fetch results. + async fn fetch_after( + &mut self, + context: &Context, + uid: u32, + ) -> Result> { + if self.session.is_none() { + return Err(Error::NoConnection); + } + + let session = self.session.as_mut().unwrap(); + + // fetch messages with larger UID than the last one seen + // `(UID FETCH lastseenuid+1:*)`, see RFC 4549 + let set = format!("{}:*", uid + 1); + let mut list = session + .uid_fetch(set, PREFETCH_FLAGS) + .await + .map_err(Error::FetchFailed)?; + + let mut msgs = BTreeMap::new(); + while let Some(fetch) = list.next().await { + let msg = fetch.map_err(|err| Error::Other(err.to_string()))?; + if let Some(msg_uid) = msg.uid { + msgs.insert(msg_uid, msg); + } + } + drop(list); + + // If the mailbox is not empty, results always include + // at least one UID, even if last_seen_uid+1 is past + // the last UID in the mailbox. It happens because + // uid+1:* is interpreted the same way as *:uid+1. + // See https://tools.ietf.org/html/rfc3501#page-61 for + // standard reference. Therefore, sometimes we receive + // already seen messages and have to filter them out. + let new_msgs = msgs.split_off(&(uid + 1)); + + for current_uid in msgs.keys() { + info!( + context, + "fetch_new_messages: ignoring uid {}, last seen was {}", current_uid, uid + ); + } + + Ok(new_msgs) + } + async fn set_config_last_seen_uid>( &self, context: &Context, @@ -756,6 +723,20 @@ impl Imap { return (None, 0); } + if !self.is_connected() { + warn!(context, "Not connected"); + return (None, server_uids.len()); + } + + if self.session.is_none() { + // we could not get a valid imap session, this should be retried + self.trigger_reconnect(); + warn!(context, "Could not get IMAP session"); + return (None, server_uids.len()); + } + + let session = self.session.as_mut().unwrap(); + let set = if server_uids.len() == 1 { server_uids[0].to_string() } else { @@ -765,74 +746,67 @@ impl Imap { format!("{}:{}", first_uid, last_uid) }; - if !self.is_connected() { - warn!(context, "Not connected"); - return (None, server_uids.len()); - } - - let mut msgs = if let Some(ref mut session) = &mut self.session { - match session.uid_fetch(&set, BODY_FLAGS).await { - Ok(msgs) => msgs, - Err(err) => { - // TODO maybe differentiate between IO and input/parsing problems - // so we don't reconnect if we have a (rare) input/output parsing problem? - self.should_reconnect = true; - warn!( - context, - "Error on fetching messages #{} from folder \"{}\"; error={}.", - &set, - folder.as_ref(), - err - ); - return (None, server_uids.len()); - } + let mut msgs = match session.uid_fetch(&set, BODY_FLAGS).await { + Ok(msgs) => msgs, + Err(err) => { + // TODO: maybe differentiate between IO and input/parsing problems + // so we don't reconnect if we have a (rare) input/output parsing problem? + self.should_reconnect = true; + warn!( + context, + "Error on fetching messages #{} from folder \"{}\"; error={}.", + &set, + folder.as_ref(), + err + ); + return (None, server_uids.len()); } - } else { - // we could not get a valid imap session, this should be retried - self.trigger_reconnect(); - warn!(context, "Could not get IMAP session"); - return (None, server_uids.len()); }; + let folder = folder.as_ref().to_string(); + let mut read_errors = 0; let mut last_uid = None; let mut count = 0; - let mut jobs = Vec::with_capacity(server_uids.len()); - + let mut tasks = Vec::with_capacity(server_uids.len()); while let Some(Ok(msg)) = msgs.next().await { let server_uid = msg.uid.unwrap_or_default(); + if !server_uids.contains(&server_uid) { // skip if there are some in between we are not interested in continue; } count += 1; - // XXX put flags into a set and pass them to dc_receive_imf let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); - let is_seen = msg.flags().any(|flag| flag == Flag::Seen); + if is_deleted || msg.body().is_none() { + // No need to process these. + continue; + } - if !is_deleted && msg.body().is_some() { - let folder = folder.as_ref().to_string(); - let context = context.clone(); - let task = async_std::task::spawn(async move { - let body = msg.body().unwrap_or_default(); + // XXX put flags into a set and pass them to dc_receive_imf + let context = context.clone(); + let folder = folder.clone(); - if let Err(err) = - dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await - { + let task = async_std::task::spawn(async move { + // safe, as we checked above that there is a body. + let body = msg.body().unwrap(); + let is_seen = msg.flags().any(|flag| flag == Flag::Seen); + + match dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await { + Ok(_) => Some(server_uid), + Err(err) => { warn!(context, "dc_receive_imf error: {}", err); read_errors += 1; None - } else { - Some(server_uid) } - }); - jobs.push(task); - } + } + }); + tasks.push(task); } - for task in futures::future::join_all(jobs).await { + for task in futures::future::join_all(tasks).await { match task { Some(uid) => { last_uid = Some(uid); @@ -1493,3 +1467,50 @@ async fn prefetch_should_download( let show = show && !blocked_contact; Ok(show) } + +async fn message_needs_processing( + context: &Context, + current_uid: u32, + headers: &[mailparse::MailHeader<'_>], + msg_id: &str, + folder: &str, + show_emails: ShowEmails, +) -> bool { + let skip = match precheck_imf(context, &msg_id, folder, current_uid).await { + Ok(skip) => skip, + Err(err) => { + warn!(context, "precheck_imf error: {}", err); + true + } + }; + + if skip { + // we know the message-id already or don't want the message otherwise. + info!( + context, + "Skipping message {} from \"{}\" by precheck.", msg_id, folder, + ); + return false; + } + + // we do not know the message-id + // or the message-id is missing (in this case, we create one in the further process) + // or some other error happened + let show = match prefetch_should_download(context, &headers, show_emails).await { + Ok(show) => show, + Err(err) => { + warn!(context, "prefetch_should_download error: {}", err); + true + } + }; + + if !show { + info!( + context, + "Ignoring new message {} from \"{}\".", msg_id, folder, + ); + return false; + } + + true +}