mirror of
https://github.com/chatmail/core.git
synced 2026-04-22 16:06:30 +03:00
Don't download ranges of messages (i.e. first:last) (#2061)
This commit is contained in:
277
src/imap/mod.rs
277
src/imap/mod.rs
@@ -684,7 +684,7 @@ impl Imap {
|
||||
|
||||
// check passed, go fetch the emails
|
||||
let (new_last_seen_uid_processed, error_cnt) = self
|
||||
.fetch_many_msgs(context, &folder, &uids, fetch_existing_msgs)
|
||||
.fetch_many_msgs(context, &folder, uids, fetch_existing_msgs)
|
||||
.await;
|
||||
read_errors += error_cnt;
|
||||
|
||||
@@ -722,16 +722,14 @@ impl Imap {
|
||||
.ok_or_else(|| format_err!("Not configured"))?;
|
||||
|
||||
let search_command = format!("FROM \"{}\"", self_addr);
|
||||
let uids = session.uid_search(search_command).await?;
|
||||
let uid_strings: Vec<String> = uids.into_iter().map(|s| s.to_string()).collect();
|
||||
let uids = session
|
||||
.uid_search(search_command)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let mut result = Vec::new();
|
||||
// We fetch the emails in chunks of 100 because according to https://tools.ietf.org/html/rfc2683#section-3.2.1.5
|
||||
// command lines should not be much more than 1000 chars and UIDs can get up to 9- or 10-digit
|
||||
// (servers should allow at least 8000 chars)
|
||||
for uid_chunk in uid_strings.chunks(100) {
|
||||
let uid_set = uid_chunk.join(",");
|
||||
|
||||
for uid_set in &build_sequence_sets(uids) {
|
||||
let mut list = session
|
||||
.uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])")
|
||||
.await
|
||||
@@ -848,20 +846,12 @@ impl Imap {
|
||||
&mut self,
|
||||
context: &Context,
|
||||
folder: S,
|
||||
server_uids: &[u32],
|
||||
server_uids: Vec<u32>,
|
||||
fetching_existing_messages: bool,
|
||||
) -> (Option<u32>, usize) {
|
||||
let set = match server_uids {
|
||||
[] => return (None, 0),
|
||||
[server_uid] => server_uid.to_string(),
|
||||
[first_uid, .., last_uid] => {
|
||||
// XXX: it is assumed that UIDs are sorted and
|
||||
// contiguous. If UIDs are not contiguous, more
|
||||
// messages than needed will be downloaded.
|
||||
debug_assert!(first_uid < last_uid, "uids must be sorted");
|
||||
format!("{}:{}", first_uid, last_uid)
|
||||
}
|
||||
};
|
||||
if server_uids.is_empty() {
|
||||
return (None, 0);
|
||||
}
|
||||
|
||||
if !self.is_connected() {
|
||||
warn!(context, "Not connected");
|
||||
@@ -877,76 +867,87 @@ impl Imap {
|
||||
|
||||
let session = self.session.as_mut().unwrap();
|
||||
|
||||
let mut msgs = match session.uid_fetch(&set, BODY_FLAGS).await {
|
||||
Ok(msgs) => msgs,
|
||||
Err(err) => {
|
||||
// TODO: maybe differentiate between IO and input/parsing problems
|
||||
// so we don't reconnect if we have a (rare) input/output parsing problem?
|
||||
self.should_reconnect = true;
|
||||
warn!(
|
||||
context,
|
||||
"Error on fetching messages #{} from folder \"{}\"; error={}.",
|
||||
&set,
|
||||
folder.as_ref(),
|
||||
err
|
||||
);
|
||||
return (None, server_uids.len());
|
||||
}
|
||||
};
|
||||
|
||||
let folder = folder.as_ref().to_string();
|
||||
|
||||
let sets = build_sequence_sets(server_uids.clone());
|
||||
let mut read_errors = 0;
|
||||
let mut last_uid = None;
|
||||
let mut count = 0;
|
||||
let mut last_uid = None;
|
||||
|
||||
while let Some(Ok(msg)) = msgs.next().await {
|
||||
let server_uid = msg.uid.unwrap_or_default();
|
||||
|
||||
if !server_uids.contains(&server_uid) {
|
||||
// skip if there are some in between we are not interested in
|
||||
continue;
|
||||
}
|
||||
count += 1;
|
||||
|
||||
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
|
||||
if is_deleted || msg.body().is_none() {
|
||||
// No need to process these.
|
||||
continue;
|
||||
}
|
||||
|
||||
// XXX put flags into a set and pass them to dc_receive_imf
|
||||
let context = context.clone();
|
||||
let folder = folder.clone();
|
||||
|
||||
// safe, as we checked above that there is a body.
|
||||
let body = msg.body().unwrap();
|
||||
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
|
||||
|
||||
match dc_receive_imf_inner(
|
||||
&context,
|
||||
&body,
|
||||
&folder,
|
||||
server_uid,
|
||||
is_seen,
|
||||
fetching_existing_messages,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => last_uid = Some(server_uid),
|
||||
for set in sets.iter() {
|
||||
let mut msgs = match session.uid_fetch(&set, BODY_FLAGS).await {
|
||||
Ok(msgs) => msgs,
|
||||
Err(err) => {
|
||||
warn!(context, "dc_receive_imf error: {}", err);
|
||||
read_errors += 1;
|
||||
// TODO: maybe differentiate between IO and input/parsing problems
|
||||
// so we don't reconnect if we have a (rare) input/output parsing problem?
|
||||
self.should_reconnect = true;
|
||||
warn!(
|
||||
context,
|
||||
"Error on fetching messages #{} from folder \"{}\"; error={}.",
|
||||
&set,
|
||||
folder.as_ref(),
|
||||
err
|
||||
);
|
||||
return (None, server_uids.len());
|
||||
}
|
||||
};
|
||||
|
||||
let folder = folder.as_ref().to_string();
|
||||
|
||||
while let Some(Ok(msg)) = msgs.next().await {
|
||||
let server_uid = msg.uid.unwrap_or_default();
|
||||
|
||||
if !server_uids.contains(&server_uid) {
|
||||
warn!(
|
||||
context,
|
||||
"Got unwanted uid {} not in {:?}, requested {:?}",
|
||||
&server_uid,
|
||||
server_uids,
|
||||
&sets
|
||||
);
|
||||
continue;
|
||||
}
|
||||
count += 1;
|
||||
|
||||
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
|
||||
if is_deleted || msg.body().is_none() {
|
||||
// No need to process these.
|
||||
continue;
|
||||
}
|
||||
|
||||
// XXX put flags into a set and pass them to dc_receive_imf
|
||||
let context = context.clone();
|
||||
let folder = folder.clone();
|
||||
|
||||
// safe, as we checked above that there is a body.
|
||||
let body = msg.body().unwrap();
|
||||
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
|
||||
|
||||
match dc_receive_imf_inner(
|
||||
&context,
|
||||
&body,
|
||||
&folder,
|
||||
server_uid,
|
||||
is_seen,
|
||||
fetching_existing_messages,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => last_uid = Some(server_uid),
|
||||
Err(err) => {
|
||||
warn!(context, "dc_receive_imf error: {}", err);
|
||||
read_errors += 1;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if count != server_uids.len() {
|
||||
warn!(
|
||||
context,
|
||||
"failed to fetch all uids: got {}, requested {}",
|
||||
"failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}",
|
||||
count,
|
||||
server_uids.len()
|
||||
server_uids.len(),
|
||||
server_uids,
|
||||
sets
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1703,6 +1704,64 @@ async fn get_config_last_seen_uid<S: AsRef<str>>(context: &Context, folder: S) -
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000
|
||||
/// characters because according to https://tools.ietf.org/html/rfc2683#section-3.2.1.5
|
||||
/// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars)
|
||||
fn build_sequence_sets(mut uids: Vec<u32>) -> Vec<String> {
|
||||
uids.sort_unstable();
|
||||
|
||||
// first, try to find consecutive ranges:
|
||||
let mut ranges: Vec<UidRange> = vec![];
|
||||
|
||||
for current in uids {
|
||||
if let Some(last) = ranges.last_mut() {
|
||||
if last.end + 1 == current {
|
||||
last.end = current;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ranges.push(UidRange {
|
||||
start: current,
|
||||
end: current,
|
||||
});
|
||||
}
|
||||
|
||||
// Second, sort the uids into uid sets that are each below ~1000 characters
|
||||
let mut result = vec![String::new()];
|
||||
for range in ranges {
|
||||
if let Some(last) = result.last_mut() {
|
||||
if !last.is_empty() {
|
||||
last.push(',');
|
||||
}
|
||||
last.push_str(&range.to_string());
|
||||
|
||||
if last.len() > 990 {
|
||||
result.push(String::new()); // Start a new uid set
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.retain(|s| !s.is_empty());
|
||||
result
|
||||
}
|
||||
|
||||
struct UidRange {
|
||||
start: u32,
|
||||
end: u32,
|
||||
// If start == end, then this range represents a single number
|
||||
}
|
||||
|
||||
impl std::fmt::Display for UidRange {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
if self.start == self.end {
|
||||
write!(f, "{}", self.start)
|
||||
} else {
|
||||
write!(f, "{}:{}", self.start, self.end)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -1730,4 +1789,64 @@ mod tests {
|
||||
);
|
||||
assert_eq!(get_folder_meaning_by_name("xxx"), FolderMeaning::Unknown);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_sequence_sets() {
|
||||
let cases = vec![
|
||||
(vec![], vec![]),
|
||||
(vec![1], vec!["1"]),
|
||||
(vec![3291], vec!["3291"]),
|
||||
(vec![1, 3, 5, 7, 9, 11], vec!["1,3,5,7,9,11"]),
|
||||
(vec![1, 2, 3], vec!["1:3"]),
|
||||
(vec![1, 4, 5, 6], vec!["1,4:6"]),
|
||||
((1..=500).collect(), vec!["1:500"]),
|
||||
(vec![3, 4, 8, 9, 10, 11, 39, 50, 2], vec!["2:4,8:11,39,50"]),
|
||||
];
|
||||
for (input, output) in cases {
|
||||
assert_eq!(build_sequence_sets(input), output);
|
||||
}
|
||||
|
||||
let numbers: Vec<_> = (2..=500).step_by(2).collect();
|
||||
let result = build_sequence_sets(numbers.clone());
|
||||
for set in &result {
|
||||
assert!(set.len() < 1010);
|
||||
assert!(!set.ends_with(','));
|
||||
assert!(!set.starts_with(','));
|
||||
}
|
||||
assert!(result.len() == 1); // these UIDs fit in one set
|
||||
for number in &numbers {
|
||||
assert!(result
|
||||
.iter()
|
||||
.any(|set| set.split(',').any(|n| n.parse::<u32>().unwrap() == *number)));
|
||||
}
|
||||
|
||||
let numbers: Vec<_> = (1..=1000).step_by(3).collect();
|
||||
let result = build_sequence_sets(numbers.clone());
|
||||
for set in &result {
|
||||
assert!(set.len() < 1010);
|
||||
assert!(!set.ends_with(','));
|
||||
assert!(!set.starts_with(','));
|
||||
}
|
||||
assert!(result.last().unwrap().ends_with("997,1000"));
|
||||
assert!(result.len() == 2); // This time we need 2 sets
|
||||
for number in &numbers {
|
||||
assert!(result
|
||||
.iter()
|
||||
.any(|set| set.split(',').any(|n| n.parse::<u32>().unwrap() == *number)));
|
||||
}
|
||||
|
||||
let numbers: Vec<_> = (30000000..=30002500).step_by(4).collect();
|
||||
let result = build_sequence_sets(numbers.clone());
|
||||
for set in &result {
|
||||
assert!(set.len() < 1010);
|
||||
assert!(!set.ends_with(','));
|
||||
assert!(!set.starts_with(','));
|
||||
}
|
||||
assert_eq!(result.len(), 6);
|
||||
for number in &numbers {
|
||||
assert!(result
|
||||
.iter()
|
||||
.any(|set| set.split(',').any(|n| n.parse::<u32>().unwrap() == *number)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user