mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
feat(imap): process incoming messages in bulk
This commit is contained in:
233
src/imap/mod.rs
233
src/imap/mod.rs
@@ -578,10 +578,6 @@ impl Imap {
|
||||
.await?;
|
||||
|
||||
let mut read_cnt: usize = 0;
|
||||
let mut read_errors = 0;
|
||||
|
||||
// prefetch info from all unfetched mails
|
||||
let mut new_last_seen_uid = last_seen_uid;
|
||||
|
||||
if self.session.is_none() {
|
||||
return Err(Error::NoConnection);
|
||||
@@ -606,84 +602,109 @@ impl Imap {
|
||||
drop(list);
|
||||
|
||||
msgs.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default());
|
||||
let msgs: Vec<_> = msgs
|
||||
.into_iter()
|
||||
.filter(|msg| {
|
||||
let cur_uid = msg.uid.unwrap_or_default();
|
||||
if cur_uid <= last_seen_uid {
|
||||
// If the mailbox is not empty, results always include
|
||||
// at least one UID, even if last_seen_uid+1 is past
|
||||
// the last UID in the mailbox. It happens because
|
||||
// uid+1:* is interpreted the same way as *:uid+1.
|
||||
// See https://tools.ietf.org/html/rfc3501#page-61 for
|
||||
// standard reference. Therefore, sometimes we receive
|
||||
// already seen messages and have to filter them out.
|
||||
info!(
|
||||
context,
|
||||
"fetch_new_messages: ignoring uid {}, last seen was {}",
|
||||
cur_uid,
|
||||
last_seen_uid
|
||||
);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
read_cnt += msgs.len();
|
||||
|
||||
let mut read_errors = 0;
|
||||
|
||||
let mut uids = Vec::with_capacity(msgs.len());
|
||||
let mut new_last_seen_uid = None;
|
||||
|
||||
for fetch in msgs.into_iter() {
|
||||
let folder: &str = folder.as_ref();
|
||||
|
||||
let cur_uid = fetch.uid.unwrap_or_default();
|
||||
if cur_uid <= last_seen_uid {
|
||||
// If the mailbox is not empty, results always include
|
||||
// at least one UID, even if last_seen_uid+1 is past
|
||||
// the last UID in the mailbox. It happens because
|
||||
// uid+1:* is interpreted the same way as *:uid+1.
|
||||
// See https://tools.ietf.org/html/rfc3501#page-61 for
|
||||
// standard reference. Therefore, sometimes we receive
|
||||
// already seen messages and have to filter them out.
|
||||
info!(
|
||||
context,
|
||||
"fetch_new_messages: ignoring uid {}, last seen was {}", cur_uid, last_seen_uid
|
||||
);
|
||||
continue;
|
||||
}
|
||||
read_cnt += 1;
|
||||
let headers = get_fetch_headers(&fetch)?;
|
||||
let headers = match get_fetch_headers(&fetch) {
|
||||
Ok(h) => h,
|
||||
Err(err) => {
|
||||
warn!(context, "get_fetch_headers error: {}", err);
|
||||
read_errors += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let message_id = prefetch_get_message_id(&headers).unwrap_or_default();
|
||||
if let Ok(true) = precheck_imf(context, &message_id, folder.as_ref(), cur_uid)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
let skip = match precheck_imf(context, &message_id, folder, cur_uid).await {
|
||||
Ok(skip) => skip,
|
||||
Err(err) => {
|
||||
warn!(context, "precheck_imf error: {}", err);
|
||||
err
|
||||
})
|
||||
{
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if skip {
|
||||
// we know the message-id already or don't want the message otherwise.
|
||||
info!(
|
||||
context,
|
||||
"Skipping message {} from \"{}\" by precheck.",
|
||||
message_id,
|
||||
folder.as_ref(),
|
||||
"Skipping message {} from \"{}\" by precheck.", message_id, folder,
|
||||
);
|
||||
if read_errors == 0 {
|
||||
new_last_seen_uid = Some(cur_uid);
|
||||
}
|
||||
} else {
|
||||
// we do not know the message-id
|
||||
// or the message-id is missing (in this case, we create one in the further process)
|
||||
// or some other error happened
|
||||
let show = prefetch_should_download(context, &headers, show_emails)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
let show = match prefetch_should_download(context, &headers, show_emails).await {
|
||||
Ok(show) => show,
|
||||
Err(err) => {
|
||||
warn!(context, "prefetch_should_download error: {}", err);
|
||||
err
|
||||
})
|
||||
.unwrap_or(true);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if !show {
|
||||
if show {
|
||||
uids.push(cur_uid);
|
||||
} else {
|
||||
info!(
|
||||
context,
|
||||
"Ignoring new message {} from \"{}\".",
|
||||
message_id,
|
||||
folder.as_ref(),
|
||||
"Ignoring new message {} from \"{}\".", message_id, folder,
|
||||
);
|
||||
} else {
|
||||
// check passed, go fetch the rest
|
||||
if let Err(err) = self.fetch_single_msg(context, &folder, cur_uid).await {
|
||||
info!(
|
||||
context,
|
||||
"Read error for message {} from \"{}\", trying over later: {}.",
|
||||
message_id,
|
||||
folder.as_ref(),
|
||||
err
|
||||
);
|
||||
read_errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if read_errors == 0 {
|
||||
new_last_seen_uid = cur_uid;
|
||||
if read_errors == 0 {
|
||||
new_last_seen_uid = Some(cur_uid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if new_last_seen_uid > last_seen_uid {
|
||||
// check passed, go fetch the emails
|
||||
let (new_last_seen_uid_processed, error_cnt) =
|
||||
self.fetch_many_msgs(context, &folder, &uids).await;
|
||||
|
||||
let new_last_seen_uid_processed = new_last_seen_uid_processed.unwrap_or_default();
|
||||
let new_last_seen_uid = new_last_seen_uid.unwrap_or_default();
|
||||
let last_one = new_last_seen_uid.max(new_last_seen_uid_processed);
|
||||
if last_one > last_seen_uid {
|
||||
self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid)
|
||||
.await;
|
||||
}
|
||||
|
||||
read_errors += error_cnt;
|
||||
|
||||
if read_errors > 0 {
|
||||
warn!(
|
||||
context,
|
||||
@@ -721,25 +742,36 @@ impl Imap {
|
||||
.ok();
|
||||
}
|
||||
|
||||
/// Fetches a single message by server UID.
|
||||
/// Fetches a list of messages by server UID.
|
||||
/// The passed in list of uids must be sorted.
|
||||
///
|
||||
/// If it succeeds, the message should be treated as received even
|
||||
/// if no database entries are created. If the function returns an
|
||||
/// error, the caller should try again later.
|
||||
async fn fetch_single_msg<S: AsRef<str>>(
|
||||
/// Returns the last uid fetch successfully and an error count.
|
||||
async fn fetch_many_msgs<S: AsRef<str>>(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: S,
|
||||
server_uid: u32,
|
||||
) -> Result<()> {
|
||||
if !self.is_connected() {
|
||||
return Err(Error::Other("Not connected".to_string()));
|
||||
server_uids: &[u32],
|
||||
) -> (Option<u32>, usize) {
|
||||
if server_uids.is_empty() {
|
||||
return (None, 0);
|
||||
}
|
||||
|
||||
let set = format!("{}", server_uid);
|
||||
let set = if server_uids.len() == 1 {
|
||||
server_uids[0].to_string()
|
||||
} else {
|
||||
let first_uid = server_uids[0];
|
||||
let last_uid = server_uids[server_uids.len() - 1];
|
||||
assert!(first_uid < last_uid, "uids must be sorted");
|
||||
format!("{}:{}", first_uid, last_uid)
|
||||
};
|
||||
|
||||
if !self.is_connected() {
|
||||
warn!(context, "Not connected");
|
||||
return (None, server_uids.len());
|
||||
}
|
||||
|
||||
let mut msgs = if let Some(ref mut session) = &mut self.session {
|
||||
match session.uid_fetch(set, BODY_FLAGS).await {
|
||||
match session.uid_fetch(&set, BODY_FLAGS).await {
|
||||
Ok(msgs) => msgs,
|
||||
Err(err) => {
|
||||
// TODO maybe differentiate between IO and input/parsing problems
|
||||
@@ -747,43 +779,80 @@ impl Imap {
|
||||
self.should_reconnect = true;
|
||||
warn!(
|
||||
context,
|
||||
"Error on fetching message #{} from folder \"{}\"; error={}.",
|
||||
server_uid,
|
||||
"Error on fetching messages #{} from folder \"{}\"; error={}.",
|
||||
&set,
|
||||
folder.as_ref(),
|
||||
err
|
||||
);
|
||||
return Err(Error::FetchFailed(err));
|
||||
return (None, server_uids.len());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// we could not get a valid imap session, this should be retried
|
||||
self.trigger_reconnect();
|
||||
return Err(Error::Other("Could not get IMAP session".to_string()));
|
||||
warn!(context, "Could not get IMAP session");
|
||||
return (None, server_uids.len());
|
||||
};
|
||||
|
||||
if let Some(Ok(msg)) = msgs.next().await {
|
||||
let mut read_errors = 0;
|
||||
let mut last_uid = None;
|
||||
let mut count = 0;
|
||||
|
||||
let mut jobs = Vec::with_capacity(server_uids.len());
|
||||
|
||||
while let Some(Ok(msg)) = msgs.next().await {
|
||||
let server_uid = msg.uid.unwrap_or_default();
|
||||
if !server_uids.contains(&server_uid) {
|
||||
// skip if there are some in between we are not interested in
|
||||
continue;
|
||||
}
|
||||
count += 1;
|
||||
|
||||
// XXX put flags into a set and pass them to dc_receive_imf
|
||||
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
|
||||
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
|
||||
|
||||
if !is_deleted && msg.body().is_some() {
|
||||
let body = msg.body().unwrap_or_default();
|
||||
if let Err(err) =
|
||||
dc_receive_imf(context, &body, folder.as_ref(), server_uid, is_seen).await
|
||||
{
|
||||
return Err(Error::Other(format!("dc_receive_imf error: {}", err)));
|
||||
let folder = folder.as_ref().to_string();
|
||||
let context = context.clone();
|
||||
let task = async_std::task::spawn(async move {
|
||||
let body = msg.body().unwrap_or_default();
|
||||
|
||||
if let Err(err) =
|
||||
dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await
|
||||
{
|
||||
warn!(context, "dc_receive_imf error: {}", err);
|
||||
read_errors += 1;
|
||||
None
|
||||
} else {
|
||||
Some(server_uid)
|
||||
}
|
||||
});
|
||||
jobs.push(task);
|
||||
}
|
||||
}
|
||||
|
||||
for task in futures::future::join_all(jobs).await {
|
||||
match task {
|
||||
Some(uid) => {
|
||||
last_uid = Some(uid);
|
||||
}
|
||||
None => {
|
||||
read_errors += 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
if count != server_uids.len() {
|
||||
warn!(
|
||||
context,
|
||||
"Message #{} does not exist in folder \"{}\".",
|
||||
server_uid,
|
||||
folder.as_ref()
|
||||
"failed to fetch all uids: got {}, requested {}",
|
||||
count,
|
||||
server_uids.len()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
(last_uid, read_errors)
|
||||
}
|
||||
|
||||
pub async fn can_move(&self) -> bool {
|
||||
|
||||
Reference in New Issue
Block a user