mirror of
https://github.com/chatmail/core.git
synced 2026-05-05 06:16:30 +03:00
refactor: improve structure of fetch_messages
also fixes updating last_seen_uid to the correct value
This commit is contained in:
341
src/imap/mod.rs
341
src/imap/mod.rs
@@ -3,7 +3,7 @@
|
|||||||
//! uses [async-email/async-imap](https://github.com/async-email/async-imap)
|
//! uses [async-email/async-imap](https://github.com/async-email/async-imap)
|
||||||
//! to implement connect, fetch, delete functionality with standard IMAP servers.
|
//! to implement connect, fetch, delete functionality with standard IMAP servers.
|
||||||
|
|
||||||
use num_traits::FromPrimitive;
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use async_imap::{
|
use async_imap::{
|
||||||
error::Result as ImapResult,
|
error::Result as ImapResult,
|
||||||
@@ -11,6 +11,7 @@ use async_imap::{
|
|||||||
};
|
};
|
||||||
use async_std::prelude::*;
|
use async_std::prelude::*;
|
||||||
use async_std::sync::Receiver;
|
use async_std::sync::Receiver;
|
||||||
|
use num_traits::FromPrimitive;
|
||||||
|
|
||||||
use crate::config::*;
|
use crate::config::*;
|
||||||
use crate::constants::*;
|
use crate::constants::*;
|
||||||
@@ -577,154 +578,120 @@ impl Imap {
|
|||||||
.select_with_uidvalidity(context, folder.as_ref())
|
.select_with_uidvalidity(context, folder.as_ref())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut read_cnt: usize = 0;
|
let msgs = self.fetch_after(context, last_seen_uid).await?;
|
||||||
|
let read_cnt = msgs.len();
|
||||||
if self.session.is_none() {
|
let folder: &str = folder.as_ref();
|
||||||
return Err(Error::NoConnection);
|
|
||||||
}
|
|
||||||
let session = self.session.as_mut().unwrap();
|
|
||||||
|
|
||||||
// fetch messages with larger UID than the last one seen
|
|
||||||
// `(UID FETCH lastseenuid+1:*)`, see RFC 4549
|
|
||||||
let set = format!("{}:*", last_seen_uid + 1);
|
|
||||||
let mut list = match session.uid_fetch(set, PREFETCH_FLAGS).await {
|
|
||||||
Ok(list) => list,
|
|
||||||
Err(err) => {
|
|
||||||
return Err(Error::FetchFailed(err));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut msgs = Vec::new();
|
|
||||||
while let Some(fetch) = list.next().await {
|
|
||||||
let fetch = fetch.map_err(|err| Error::Other(err.to_string()))?;
|
|
||||||
msgs.push(fetch);
|
|
||||||
}
|
|
||||||
drop(list);
|
|
||||||
|
|
||||||
msgs.sort_unstable_by_key(|msg| msg.uid.unwrap_or_default());
|
|
||||||
let msgs: Vec<_> = msgs
|
|
||||||
.into_iter()
|
|
||||||
.filter(|msg| {
|
|
||||||
let cur_uid = msg.uid.unwrap_or_default();
|
|
||||||
if cur_uid <= last_seen_uid {
|
|
||||||
// If the mailbox is not empty, results always include
|
|
||||||
// at least one UID, even if last_seen_uid+1 is past
|
|
||||||
// the last UID in the mailbox. It happens because
|
|
||||||
// uid+1:* is interpreted the same way as *:uid+1.
|
|
||||||
// See https://tools.ietf.org/html/rfc3501#page-61 for
|
|
||||||
// standard reference. Therefore, sometimes we receive
|
|
||||||
// already seen messages and have to filter them out.
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"fetch_new_messages: ignoring uid {}, last seen was {}",
|
|
||||||
cur_uid,
|
|
||||||
last_seen_uid
|
|
||||||
);
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
read_cnt += msgs.len();
|
|
||||||
|
|
||||||
let mut read_errors = 0;
|
let mut read_errors = 0;
|
||||||
|
|
||||||
let mut uids = Vec::with_capacity(msgs.len());
|
let mut uids = Vec::with_capacity(msgs.len());
|
||||||
let mut new_last_seen_uid = None;
|
let mut new_last_seen_uid = None;
|
||||||
|
|
||||||
for fetch in msgs.into_iter() {
|
for (current_uid, msg) in msgs.into_iter() {
|
||||||
let folder: &str = folder.as_ref();
|
let (headers, msg_id) = match get_fetch_headers(&msg) {
|
||||||
|
Ok(headers) => {
|
||||||
let cur_uid = fetch.uid.unwrap_or_default();
|
let msg_id = prefetch_get_message_id(&headers).unwrap_or_default();
|
||||||
let headers = match get_fetch_headers(&fetch) {
|
(headers, msg_id)
|
||||||
Ok(h) => h,
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(context, "get_fetch_headers error: {}", err);
|
warn!(context, "{}", err);
|
||||||
read_errors += 1;
|
read_errors += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let message_id = prefetch_get_message_id(&headers).unwrap_or_default();
|
if message_needs_processing(
|
||||||
let skip = match precheck_imf(context, &message_id, folder, cur_uid).await {
|
context,
|
||||||
Ok(skip) => skip,
|
current_uid,
|
||||||
Err(err) => {
|
&headers,
|
||||||
warn!(context, "precheck_imf error: {}", err);
|
&msg_id,
|
||||||
true
|
folder,
|
||||||
}
|
show_emails,
|
||||||
};
|
)
|
||||||
|
.await
|
||||||
if skip {
|
{
|
||||||
// we know the message-id already or don't want the message otherwise.
|
// Trigger download and processing for this message.
|
||||||
info!(
|
uids.push(current_uid);
|
||||||
context,
|
} else if read_errors == 0 {
|
||||||
"Skipping message {} from \"{}\" by precheck.", message_id, folder,
|
// No errors so far, but this was skipped, so mark as last_seen_uid
|
||||||
);
|
new_last_seen_uid = Some(current_uid);
|
||||||
if read_errors == 0 {
|
|
||||||
new_last_seen_uid = Some(cur_uid);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// we do not know the message-id
|
|
||||||
// or the message-id is missing (in this case, we create one in the further process)
|
|
||||||
// or some other error happened
|
|
||||||
let show = match prefetch_should_download(context, &headers, show_emails).await {
|
|
||||||
Ok(show) => show,
|
|
||||||
Err(err) => {
|
|
||||||
warn!(context, "prefetch_should_download error: {}", err);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if show {
|
|
||||||
uids.push(cur_uid);
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"Ignoring new message {} from \"{}\".", message_id, folder,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
if read_errors == 0 {
|
|
||||||
new_last_seen_uid = Some(cur_uid);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// check passed, go fetch the emails
|
// check passed, go fetch the emails
|
||||||
let (new_last_seen_uid_processed, error_cnt) =
|
let (new_last_seen_uid_processed, error_cnt) =
|
||||||
self.fetch_many_msgs(context, &folder, &uids).await;
|
self.fetch_many_msgs(context, &folder, &uids).await;
|
||||||
|
read_errors += error_cnt;
|
||||||
|
|
||||||
|
// determine which last_seen_uid to use to update to
|
||||||
let new_last_seen_uid_processed = new_last_seen_uid_processed.unwrap_or_default();
|
let new_last_seen_uid_processed = new_last_seen_uid_processed.unwrap_or_default();
|
||||||
let new_last_seen_uid = new_last_seen_uid.unwrap_or_default();
|
let new_last_seen_uid = new_last_seen_uid.unwrap_or_default();
|
||||||
let last_one = new_last_seen_uid.max(new_last_seen_uid_processed);
|
let last_one = new_last_seen_uid.max(new_last_seen_uid_processed);
|
||||||
|
|
||||||
if last_one > last_seen_uid {
|
if last_one > last_seen_uid {
|
||||||
self.set_config_last_seen_uid(context, &folder, uid_validity, new_last_seen_uid)
|
self.set_config_last_seen_uid(context, &folder, uid_validity, last_one)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
read_errors += error_cnt;
|
if read_errors == 0 {
|
||||||
|
info!(context, "{} mails read from \"{}\".", read_cnt, folder,);
|
||||||
if read_errors > 0 {
|
} else {
|
||||||
warn!(
|
warn!(
|
||||||
context,
|
context,
|
||||||
"{} mails read from \"{}\" with {} errors.",
|
"{} mails read from \"{}\" with {} errors.", read_cnt, folder, read_errors
|
||||||
read_cnt,
|
|
||||||
folder.as_ref(),
|
|
||||||
read_errors
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"{} mails read from \"{}\".",
|
|
||||||
read_cnt,
|
|
||||||
folder.as_ref()
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(read_cnt > 0)
|
Ok(read_cnt > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch all uids larger than the passed in. Returns a sorted list of fetch results.
|
||||||
|
async fn fetch_after(
|
||||||
|
&mut self,
|
||||||
|
context: &Context,
|
||||||
|
uid: u32,
|
||||||
|
) -> Result<BTreeMap<u32, async_imap::types::Fetch>> {
|
||||||
|
if self.session.is_none() {
|
||||||
|
return Err(Error::NoConnection);
|
||||||
|
}
|
||||||
|
|
||||||
|
let session = self.session.as_mut().unwrap();
|
||||||
|
|
||||||
|
// fetch messages with larger UID than the last one seen
|
||||||
|
// `(UID FETCH lastseenuid+1:*)`, see RFC 4549
|
||||||
|
let set = format!("{}:*", uid + 1);
|
||||||
|
let mut list = session
|
||||||
|
.uid_fetch(set, PREFETCH_FLAGS)
|
||||||
|
.await
|
||||||
|
.map_err(Error::FetchFailed)?;
|
||||||
|
|
||||||
|
let mut msgs = BTreeMap::new();
|
||||||
|
while let Some(fetch) = list.next().await {
|
||||||
|
let msg = fetch.map_err(|err| Error::Other(err.to_string()))?;
|
||||||
|
if let Some(msg_uid) = msg.uid {
|
||||||
|
msgs.insert(msg_uid, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(list);
|
||||||
|
|
||||||
|
// If the mailbox is not empty, results always include
|
||||||
|
// at least one UID, even if last_seen_uid+1 is past
|
||||||
|
// the last UID in the mailbox. It happens because
|
||||||
|
// uid+1:* is interpreted the same way as *:uid+1.
|
||||||
|
// See https://tools.ietf.org/html/rfc3501#page-61 for
|
||||||
|
// standard reference. Therefore, sometimes we receive
|
||||||
|
// already seen messages and have to filter them out.
|
||||||
|
let new_msgs = msgs.split_off(&(uid + 1));
|
||||||
|
|
||||||
|
for current_uid in msgs.keys() {
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"fetch_new_messages: ignoring uid {}, last seen was {}", current_uid, uid
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(new_msgs)
|
||||||
|
}
|
||||||
|
|
||||||
async fn set_config_last_seen_uid<S: AsRef<str>>(
|
async fn set_config_last_seen_uid<S: AsRef<str>>(
|
||||||
&self,
|
&self,
|
||||||
context: &Context,
|
context: &Context,
|
||||||
@@ -756,6 +723,20 @@ impl Imap {
|
|||||||
return (None, 0);
|
return (None, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !self.is_connected() {
|
||||||
|
warn!(context, "Not connected");
|
||||||
|
return (None, server_uids.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.session.is_none() {
|
||||||
|
// we could not get a valid imap session, this should be retried
|
||||||
|
self.trigger_reconnect();
|
||||||
|
warn!(context, "Could not get IMAP session");
|
||||||
|
return (None, server_uids.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
let session = self.session.as_mut().unwrap();
|
||||||
|
|
||||||
let set = if server_uids.len() == 1 {
|
let set = if server_uids.len() == 1 {
|
||||||
server_uids[0].to_string()
|
server_uids[0].to_string()
|
||||||
} else {
|
} else {
|
||||||
@@ -765,74 +746,67 @@ impl Imap {
|
|||||||
format!("{}:{}", first_uid, last_uid)
|
format!("{}:{}", first_uid, last_uid)
|
||||||
};
|
};
|
||||||
|
|
||||||
if !self.is_connected() {
|
let mut msgs = match session.uid_fetch(&set, BODY_FLAGS).await {
|
||||||
warn!(context, "Not connected");
|
Ok(msgs) => msgs,
|
||||||
return (None, server_uids.len());
|
Err(err) => {
|
||||||
}
|
// TODO: maybe differentiate between IO and input/parsing problems
|
||||||
|
// so we don't reconnect if we have a (rare) input/output parsing problem?
|
||||||
let mut msgs = if let Some(ref mut session) = &mut self.session {
|
self.should_reconnect = true;
|
||||||
match session.uid_fetch(&set, BODY_FLAGS).await {
|
warn!(
|
||||||
Ok(msgs) => msgs,
|
context,
|
||||||
Err(err) => {
|
"Error on fetching messages #{} from folder \"{}\"; error={}.",
|
||||||
// TODO maybe differentiate between IO and input/parsing problems
|
&set,
|
||||||
// so we don't reconnect if we have a (rare) input/output parsing problem?
|
folder.as_ref(),
|
||||||
self.should_reconnect = true;
|
err
|
||||||
warn!(
|
);
|
||||||
context,
|
return (None, server_uids.len());
|
||||||
"Error on fetching messages #{} from folder \"{}\"; error={}.",
|
|
||||||
&set,
|
|
||||||
folder.as_ref(),
|
|
||||||
err
|
|
||||||
);
|
|
||||||
return (None, server_uids.len());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// we could not get a valid imap session, this should be retried
|
|
||||||
self.trigger_reconnect();
|
|
||||||
warn!(context, "Could not get IMAP session");
|
|
||||||
return (None, server_uids.len());
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let folder = folder.as_ref().to_string();
|
||||||
|
|
||||||
let mut read_errors = 0;
|
let mut read_errors = 0;
|
||||||
let mut last_uid = None;
|
let mut last_uid = None;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
|
|
||||||
let mut jobs = Vec::with_capacity(server_uids.len());
|
let mut tasks = Vec::with_capacity(server_uids.len());
|
||||||
|
|
||||||
while let Some(Ok(msg)) = msgs.next().await {
|
while let Some(Ok(msg)) = msgs.next().await {
|
||||||
let server_uid = msg.uid.unwrap_or_default();
|
let server_uid = msg.uid.unwrap_or_default();
|
||||||
|
|
||||||
if !server_uids.contains(&server_uid) {
|
if !server_uids.contains(&server_uid) {
|
||||||
// skip if there are some in between we are not interested in
|
// skip if there are some in between we are not interested in
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
count += 1;
|
count += 1;
|
||||||
|
|
||||||
// XXX put flags into a set and pass them to dc_receive_imf
|
|
||||||
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
|
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
|
||||||
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
|
if is_deleted || msg.body().is_none() {
|
||||||
|
// No need to process these.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if !is_deleted && msg.body().is_some() {
|
// XXX put flags into a set and pass them to dc_receive_imf
|
||||||
let folder = folder.as_ref().to_string();
|
let context = context.clone();
|
||||||
let context = context.clone();
|
let folder = folder.clone();
|
||||||
let task = async_std::task::spawn(async move {
|
|
||||||
let body = msg.body().unwrap_or_default();
|
|
||||||
|
|
||||||
if let Err(err) =
|
let task = async_std::task::spawn(async move {
|
||||||
dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await
|
// safe, as we checked above that there is a body.
|
||||||
{
|
let body = msg.body().unwrap();
|
||||||
|
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
|
||||||
|
|
||||||
|
match dc_receive_imf(&context, &body, &folder, server_uid, is_seen).await {
|
||||||
|
Ok(_) => Some(server_uid),
|
||||||
|
Err(err) => {
|
||||||
warn!(context, "dc_receive_imf error: {}", err);
|
warn!(context, "dc_receive_imf error: {}", err);
|
||||||
read_errors += 1;
|
read_errors += 1;
|
||||||
None
|
None
|
||||||
} else {
|
|
||||||
Some(server_uid)
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
jobs.push(task);
|
});
|
||||||
}
|
tasks.push(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
for task in futures::future::join_all(jobs).await {
|
for task in futures::future::join_all(tasks).await {
|
||||||
match task {
|
match task {
|
||||||
Some(uid) => {
|
Some(uid) => {
|
||||||
last_uid = Some(uid);
|
last_uid = Some(uid);
|
||||||
@@ -1493,3 +1467,50 @@ async fn prefetch_should_download(
|
|||||||
let show = show && !blocked_contact;
|
let show = show && !blocked_contact;
|
||||||
Ok(show)
|
Ok(show)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn message_needs_processing(
|
||||||
|
context: &Context,
|
||||||
|
current_uid: u32,
|
||||||
|
headers: &[mailparse::MailHeader<'_>],
|
||||||
|
msg_id: &str,
|
||||||
|
folder: &str,
|
||||||
|
show_emails: ShowEmails,
|
||||||
|
) -> bool {
|
||||||
|
let skip = match precheck_imf(context, &msg_id, folder, current_uid).await {
|
||||||
|
Ok(skip) => skip,
|
||||||
|
Err(err) => {
|
||||||
|
warn!(context, "precheck_imf error: {}", err);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if skip {
|
||||||
|
// we know the message-id already or don't want the message otherwise.
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"Skipping message {} from \"{}\" by precheck.", msg_id, folder,
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we do not know the message-id
|
||||||
|
// or the message-id is missing (in this case, we create one in the further process)
|
||||||
|
// or some other error happened
|
||||||
|
let show = match prefetch_should_download(context, &headers, show_emails).await {
|
||||||
|
Ok(show) => show,
|
||||||
|
Err(err) => {
|
||||||
|
warn!(context, "prefetch_should_download error: {}", err);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !show {
|
||||||
|
info!(
|
||||||
|
context,
|
||||||
|
"Ignoring new message {} from \"{}\".", msg_id, folder,
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user