Fetch messages in the order of their INTERNALDATE (#3756)

When a batch of messages is moved from Inbox to DeltaChat folder with a single MOVE command, their
UIDs may be reordered (e.g. Gmail is known for that) which leads to that messages are processed by
receive_imf in the wrong order. But the INTERNALDATE attribute is preserved during a MOVE according
to RFC3501. So, use it for sorting fetched messages.
This commit is contained in:
iequidoo
2022-11-29 13:36:54 -03:00
committed by iequidoo
parent 4d81fa6df5
commit 9b881cdd19
2 changed files with 125 additions and 91 deletions

View File

@@ -15,6 +15,7 @@
- strip leading/trailing whitespace from "Chat-Group-Name{,-Changed}:" headers content #3650 - strip leading/trailing whitespace from "Chat-Group-Name{,-Changed}:" headers content #3650
- Assume all Thunderbird users prefer encryption #3774 - Assume all Thunderbird users prefer encryption #3774
- refactor peerstate handling to ensure no duplicate peerstates #3776 - refactor peerstate handling to ensure no duplicate peerstates #3776
- Fetch messages in order of their INTERNALDATE (fixes reactions for Gmail f.e.) #3789
## 1.102.0 ## 1.102.0

View File

@@ -6,8 +6,9 @@
use std::{ use std::{
cmp, cmp,
cmp::max, cmp::max,
collections::{BTreeMap, BTreeSet}, collections::{BTreeMap, BTreeSet, HashMap},
iter::Peekable, iter::Peekable,
mem::take,
}; };
use anyhow::{bail, format_err, Context as _, Result}; use anyhow::{bail, format_err, Context as _, Result};
@@ -71,7 +72,7 @@ pub enum ImapActionResult {
/// - Chat-Version to check if a message is a chat message /// - Chat-Version to check if a message is a chat message
/// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message, /// - Autocrypt-Setup-Message to check if a message is an autocrypt setup message,
/// not necessarily sent by Delta Chat. /// not necessarily sent by Delta Chat.
const PREFETCH_FLAGS: &str = "(UID RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\ const PREFETCH_FLAGS: &str = "(UID INTERNALDATE RFC822.SIZE BODY.PEEK[HEADER.FIELDS (\
MESSAGE-ID \ MESSAGE-ID \
X-MICROSOFT-ORIGINAL-MESSAGE-ID \ X-MICROSOFT-ORIGINAL-MESSAGE-ID \
FROM \ FROM \
@@ -1263,14 +1264,15 @@ impl Imap {
.as_mut() .as_mut()
.context("IMAP No Connection established")?; .context("IMAP No Connection established")?;
let uids = session let mut uids: Vec<_> = session
.uid_search(get_imap_self_sent_search_command(context).await?) .uid_search(get_imap_self_sent_search_command(context).await?)
.await? .await?
.into_iter() .into_iter()
.collect(); .collect();
uids.sort_unstable();
let mut result = Vec::new(); let mut result = Vec::new();
for uid_set in &build_sequence_sets(uids) { for (_, uid_set) in build_sequence_sets(&uids)? {
let mut list = session let mut list = session
.uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])") .uid_fetch(uid_set, "(UID BODY.PEEK[HEADER.FIELDS (FROM TO CC BCC)])")
.await .await
@@ -1296,8 +1298,9 @@ impl Imap {
Ok(result) Ok(result)
} }
/// Prefetch all messages greater than or equal to `uid_next`. Return a list of fetch results. /// Prefetch all messages greater than or equal to `uid_next`. Returns a list of fetch results
async fn prefetch(&mut self, uid_next: u32) -> Result<BTreeMap<u32, async_imap::types::Fetch>> { /// in the order of ascending delivery time to the server (INTERNALDATE).
async fn prefetch(&mut self, uid_next: u32) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
let session = self.session.as_mut(); let session = self.session.as_mut();
let session = session.context("fetch_after(): IMAP No Connection established")?; let session = session.context("fetch_after(): IMAP No Connection established")?;
@@ -1312,25 +1315,25 @@ impl Imap {
while let Some(fetch) = list.next().await { while let Some(fetch) = list.next().await {
let msg = fetch?; let msg = fetch?;
if let Some(msg_uid) = msg.uid { if let Some(msg_uid) = msg.uid {
msgs.insert(msg_uid, msg); // If the mailbox is not empty, results always include
// at least one UID, even if last_seen_uid+1 is past
// the last UID in the mailbox. It happens because
// uid:* is interpreted the same way as *:uid.
// See <https://tools.ietf.org/html/rfc3501#page-61> for
// standard reference. Therefore, sometimes we receive
// already seen messages and have to filter them out.
if msg_uid >= uid_next {
msgs.insert((msg.internal_date(), msg_uid), msg);
}
} }
} }
drop(list); drop(list);
// If the mailbox is not empty, results always include Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
// at least one UID, even if last_seen_uid+1 is past
// the last UID in the mailbox. It happens because
// uid:* is interpreted the same way as *:uid.
// See <https://tools.ietf.org/html/rfc3501#page-61> for
// standard reference. Therefore, sometimes we receive
// already seen messages and have to filter them out.
let new_msgs = msgs.split_off(&uid_next);
Ok(new_msgs)
} }
/// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages) /// Like fetch_after(), but not for new messages but existing ones (the DC_FETCH_EXISTING_MSGS_COUNT newest messages)
async fn prefetch_existing_msgs(&mut self) -> Result<BTreeMap<u32, async_imap::types::Fetch>> { async fn prefetch_existing_msgs(&mut self) -> Result<Vec<(u32, async_imap::types::Fetch)>> {
let exists: i64 = { let exists: i64 = {
let mailbox = self let mailbox = self
.config .config
@@ -1355,11 +1358,11 @@ impl Imap {
while let Some(fetch) = list.next().await { while let Some(fetch) = list.next().await {
let msg = fetch?; let msg = fetch?;
if let Some(msg_uid) = msg.uid { if let Some(msg_uid) = msg.uid {
msgs.insert(msg_uid, msg); msgs.insert((msg.internal_date(), msg_uid), msg);
} }
} }
Ok(msgs) Ok(msgs.into_iter().map(|((_, uid), msg)| (uid, msg)).collect())
} }
/// Fetches a list of messages by server UID. /// Fetches a list of messages by server UID.
@@ -1382,12 +1385,10 @@ impl Imap {
} }
let session = self.session.as_mut().context("no IMAP session")?; let session = self.session.as_mut().context("no IMAP session")?;
let sets = build_sequence_sets(&server_uids)?;
let sets = build_sequence_sets(server_uids.clone());
let mut count = 0;
let mut last_uid = None; let mut last_uid = None;
for set in sets.iter() { for (server_uids, set) in sets.iter() {
let mut msgs = match session let mut msgs = match session
.uid_fetch( .uid_fetch(
&set, &set,
@@ -1413,19 +1414,41 @@ impl Imap {
} }
}; };
while let Some(Ok(msg)) = msgs.next().await { let mut uid_msgs = server_uids
let server_uid = msg.uid.unwrap_or_default(); .iter()
.map(|&uid| (uid, None))
if !server_uids.contains(&server_uid) { .collect::<HashMap<_, _>>();
warn!( let mut server_uids_it = server_uids.iter().peekable();
context, let mut count = 0;
"Got unwanted uid {} not in {:?}, requested {:?}", while let Some(&&server_uid) = server_uids_it.peek() {
&server_uid, let mut msg = uid_msgs.insert(server_uid, None).flatten();
server_uids, while msg.is_none() {
&sets let msg_unwrapped = match msgs.next().await {
); Some(Ok(msg)) => msg,
continue; Some(Err(_)) => continue,
None => break,
};
let msg_uid = msg_unwrapped.uid.unwrap_or_default();
if !uid_msgs.contains_key(&msg_uid) {
// Unwanted UIDs are possible because of unsolicited responses, e.g. if
// another client changes \Seen flag on a message after we do a prefetch but
// before fetch. It's not an error if we receive such unsolicited response.
continue;
}
msg = Some(msg_unwrapped);
if msg_uid != server_uid && uid_msgs.insert(msg_uid, msg.take()).is_some() {
warn!(context, "Got duplicated UID {}", msg_uid);
}
} }
let msg = match msg {
Some(msg) => msg,
None => {
warn!(context, "Missed UID {} in the server response", server_uid);
server_uids_it.next();
continue;
}
};
server_uids_it.next();
count += 1; count += 1;
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
@@ -1483,17 +1506,19 @@ impl Imap {
}; };
last_uid = Some(server_uid) last_uid = Some(server_uid)
} }
} // If we don't process the whole response, IMAP client is left in a broken state where
// it will try to process the rest of response as the next response.
if count != server_uids.len() { while msgs.next().await.is_some() {}
warn!( if count != server_uids.len() {
context, warn!(
"failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}", context,
count, "failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}",
server_uids.len(), count,
server_uids, server_uids.len(),
sets server_uids,
); sets,
);
}
} }
Ok((last_uid, received_msgs)) Ok((last_uid, received_msgs))
@@ -2300,13 +2325,11 @@ async fn should_ignore_folder(
/// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000 /// Builds a list of sequence/uid sets. The returned sets have each no more than around 1000
/// characters because according to <https://tools.ietf.org/html/rfc2683#section-3.2.1.5> /// characters because according to <https://tools.ietf.org/html/rfc2683#section-3.2.1.5>
/// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars) /// command lines should not be much more than 1000 chars (servers should allow at least 8000 chars)
fn build_sequence_sets(mut uids: Vec<u32>) -> Vec<String> { fn build_sequence_sets(uids: &[u32]) -> Result<Vec<(Vec<u32>, String)>> {
uids.sort_unstable();
// first, try to find consecutive ranges: // first, try to find consecutive ranges:
let mut ranges: Vec<UidRange> = vec![]; let mut ranges: Vec<UidRange> = vec![];
for current in uids { for &current in uids {
if let Some(last) = ranges.last_mut() { if let Some(last) = ranges.last_mut() {
if last.end + 1 == current { if last.end + 1 == current {
last.end = current; last.end = current;
@@ -2321,22 +2344,24 @@ fn build_sequence_sets(mut uids: Vec<u32>) -> Vec<String> {
} }
// Second, sort the uids into uid sets that are each below ~1000 characters // Second, sort the uids into uid sets that are each below ~1000 characters
let mut result = vec![String::new()]; let mut result = vec![];
let (mut last_uids, mut last_str) = (Vec::new(), String::new());
for range in ranges { for range in ranges {
if let Some(last) = result.last_mut() { last_uids.reserve((range.end - range.start + 1).try_into()?);
if !last.is_empty() { (range.start..=range.end).for_each(|u| last_uids.push(u));
last.push(','); if !last_str.is_empty() {
} last_str.push(',');
last.push_str(&range.to_string()); }
last_str.push_str(&range.to_string());
if last.len() > 990 { if last_str.len() > 990 {
result.push(String::new()); // Start a new uid set result.push((take(&mut last_uids), take(&mut last_str)));
}
} }
} }
result.push((last_uids, last_str));
result.retain(|s| !s.is_empty()); result.retain(|(_, s)| !s.is_empty());
result Ok(result)
} }
struct UidRange { struct UidRange {
@@ -2442,61 +2467,69 @@ mod tests {
#[test] #[test]
fn test_build_sequence_sets() { fn test_build_sequence_sets() {
assert_eq!(build_sequence_sets(&[]).unwrap(), vec![]);
let cases = vec![ let cases = vec![
(vec![], vec![]), (vec![1], "1"),
(vec![1], vec!["1"]), (vec![3291], "3291"),
(vec![3291], vec!["3291"]), (vec![1, 3, 5, 7, 9, 11], "1,3,5,7,9,11"),
(vec![1, 3, 5, 7, 9, 11], vec!["1,3,5,7,9,11"]), (vec![1, 2, 3], "1:3"),
(vec![1, 2, 3], vec!["1:3"]), (vec![1, 4, 5, 6], "1,4:6"),
(vec![1, 4, 5, 6], vec!["1,4:6"]), ((1..=500).collect(), "1:500"),
((1..=500).collect(), vec!["1:500"]), (vec![3, 4, 8, 9, 10, 11, 39, 50, 2], "3:4,8:11,39,50,2"),
(vec![3, 4, 8, 9, 10, 11, 39, 50, 2], vec!["2:4,8:11,39,50"]),
]; ];
for (input, output) in cases { for (input, s) in cases {
assert_eq!(build_sequence_sets(input), output); assert_eq!(
build_sequence_sets(&input).unwrap(),
vec![(input, s.into())]
);
} }
let has_number = |(uids, s): &(Vec<u32>, String), number| {
uids.iter().any(|&n| n == number)
&& s.split(',').any(|n| n.parse::<u32>().unwrap() == number)
};
let numbers: Vec<_> = (2..=500).step_by(2).collect(); let numbers: Vec<_> = (2..=500).step_by(2).collect();
let result = build_sequence_sets(numbers.clone()); let result = build_sequence_sets(&numbers).unwrap();
for set in &result { for (_, set) in &result {
assert!(set.len() < 1010); assert!(set.len() < 1010);
assert!(!set.ends_with(',')); assert!(!set.ends_with(','));
assert!(!set.starts_with(',')); assert!(!set.starts_with(','));
} }
assert!(result.len() == 1); // these UIDs fit in one set assert!(result.len() == 1); // these UIDs fit in one set
for number in &numbers { for &number in &numbers {
assert!(result assert!(result.iter().any(|r| has_number(r, number)));
.iter()
.any(|set| set.split(',').any(|n| n.parse::<u32>().unwrap() == *number)));
} }
let numbers: Vec<_> = (1..=1000).step_by(3).collect(); let numbers: Vec<_> = (1..=1000).step_by(3).collect();
let result = build_sequence_sets(numbers.clone()); let result = build_sequence_sets(&numbers).unwrap();
for set in &result { for (_, set) in &result {
assert!(set.len() < 1010); assert!(set.len() < 1010);
assert!(!set.ends_with(',')); assert!(!set.ends_with(','));
assert!(!set.starts_with(',')); assert!(!set.starts_with(','));
} }
assert!(result.last().unwrap().ends_with("997,1000")); let (last_uids, last_str) = result.last().unwrap();
assert_eq!(
last_uids.get((last_uids.len() - 2)..).unwrap(),
&[997, 1000]
);
assert!(last_str.ends_with("997,1000"));
assert!(result.len() == 2); // This time we need 2 sets assert!(result.len() == 2); // This time we need 2 sets
for number in &numbers { for &number in &numbers {
assert!(result assert!(result.iter().any(|r| has_number(r, number)));
.iter()
.any(|set| set.split(',').any(|n| n.parse::<u32>().unwrap() == *number)));
} }
let numbers: Vec<_> = (30000000..=30002500).step_by(4).collect(); let numbers: Vec<_> = (30000000..=30002500).step_by(4).collect();
let result = build_sequence_sets(numbers.clone()); let result = build_sequence_sets(&numbers).unwrap();
for set in &result { for (_, set) in &result {
assert!(set.len() < 1010); assert!(set.len() < 1010);
assert!(!set.ends_with(',')); assert!(!set.ends_with(','));
assert!(!set.starts_with(',')); assert!(!set.starts_with(','));
} }
assert_eq!(result.len(), 6); assert_eq!(result.len(), 6);
for number in &numbers { for &number in &numbers {
assert!(result assert!(result.iter().any(|r| has_number(r, number)));
.iter()
.any(|set| set.split(',').any(|n| n.parse::<u32>().unwrap() == *number)));
} }
} }