Refactor fetch_many_msgs and add more logging

This commit is contained in:
link2xt
2022-12-03 23:12:32 +00:00
parent 22b4640b31
commit 3743aaa16e
2 changed files with 113 additions and 72 deletions

View File

@@ -1374,22 +1374,27 @@ impl Imap {
&mut self,
context: &Context,
folder: &str,
server_uids: Vec<u32>,
request_uids: Vec<u32>,
uid_message_ids: &BTreeMap<u32, String>,
fetch_partially: bool,
fetching_existing_messages: bool,
) -> Result<(Option<u32>, Vec<ReceivedMsg>)> {
let mut last_uid = None;
let mut received_msgs = Vec::new();
if server_uids.is_empty() {
return Ok((None, Vec::new()));
if request_uids.is_empty() {
return Ok((last_uid, received_msgs));
}
let session = self.session.as_mut().context("no IMAP session")?;
let sets = build_sequence_sets(&server_uids)?;
let mut last_uid = None;
for (server_uids, set) in sets.iter() {
let mut msgs = match session
for (request_uids, set) in build_sequence_sets(&request_uids)? {
info!(
context,
"Starting a {} FETCH of message set \"{}\".",
if fetch_partially { "partial" } else { "full" },
set
);
let mut fetch_responses = match session
.uid_fetch(
&set,
if fetch_partially {
@@ -1399,94 +1404,122 @@ impl Imap {
},
)
.await
.with_context(|| format!("fetching messages {} from folder \"{}\"", &set, folder))
{
Ok(msgs) => msgs,
Ok(fetch_responses) => fetch_responses,
Err(err) => {
// TODO: maybe differentiate between IO and input/parsing problems
// so we don't reconnect if we have a (rare) input/output parsing problem?
// We want to reconnect regardless of whether it's an I/O error or parsing
// error. If the protocol parser ends up in incorrect state because of some
// incompatiblity with a server, reset may help.
self.should_reconnect = true;
bail!(
"Error on fetching messages #{} from folder \"{}\"; error={}.",
&set,
folder,
err
);
return Err(err);
}
};
let mut uid_msgs = server_uids
.iter()
.map(|&uid| (uid, None))
.collect::<HashMap<_, _>>();
let mut server_uids_it = server_uids.iter().peekable();
// Map from UIDs to unprocessed FETCH results. We put unprocessed FETCH results here
// when we want to process other messages first.
let mut uid_msgs = HashMap::with_capacity(request_uids.len());
let mut count = 0;
while let Some(&&server_uid) = server_uids_it.peek() {
let mut msg = uid_msgs.insert(server_uid, None).flatten();
while msg.is_none() {
let msg_unwrapped = match msgs.next().await {
Some(Ok(msg)) => msg,
Some(Err(_)) => continue,
None => break,
for &request_uid in request_uids.iter() {
// Check if FETCH response is already in `uid_msgs`.
let mut fetch_response = uid_msgs.remove(&request_uid);
// Try to find a requsted UID in returned FETCH responses.
while fetch_response.is_none() {
let next_fetch_response =
if let Some(next_fetch_response) = fetch_responses.next().await {
next_fetch_response
} else {
// No more FETCH responses received from the server.
break;
};
let next_fetch_response = match next_fetch_response {
Ok(next_fetch_response) => next_fetch_response,
Err(err) => {
warn!(context, "Failed to process IMAP FETCH result: {}.", err);
continue;
}
};
let msg_uid = msg_unwrapped.uid.unwrap_or_default();
if !uid_msgs.contains_key(&msg_uid) {
// Unwanted UIDs are possible because of unsolicited responses, e.g. if
// another client changes \Seen flag on a message after we do a prefetch but
// before fetch. It's not an error if we receive such unsolicited response.
continue;
}
msg = Some(msg_unwrapped);
if msg_uid != server_uid && uid_msgs.insert(msg_uid, msg.take()).is_some() {
warn!(context, "Got duplicated UID {}", msg_uid);
if let Some(next_uid) = next_fetch_response.uid {
if next_uid == request_uid {
fetch_response = Some(next_fetch_response);
} else if !request_uids.contains(&next_uid) {
// (size of `request_uids` is bounded by IMAP command length limit,
// search in this vector is always fast)
// Unwanted UIDs are possible because of unsolicited responses, e.g. if
// another client changes \Seen flag on a message after we do a prefetch but
// before fetch. It's not an error if we receive such unsolicited response.
info!(
context,
"Skipping not requested FETCH response for UID {}.", next_uid
);
} else if uid_msgs.insert(next_uid, next_fetch_response).is_some() {
warn!(context, "Got duplicated UID {}.", next_uid);
}
} else {
info!(context, "Skipping FETCH response without UID.");
}
}
let msg = match msg {
Some(msg) => msg,
let fetch_response = match fetch_response {
Some(fetch) => fetch,
None => {
warn!(context, "Missed UID {} in the server response", server_uid);
server_uids_it.next();
warn!(
context,
"Missed UID {} in the server response.", request_uid
);
continue;
}
};
server_uids_it.next();
count += 1;
let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted);
let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted);
let (body, partial) = if fetch_partially {
(msg.header(), msg.size) // `BODY.PEEK[HEADER]` goes to header() ...
(fetch_response.header(), fetch_response.size) // `BODY.PEEK[HEADER]` goes to header() ...
} else {
(msg.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header()
(fetch_response.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header()
};
if is_deleted || body.is_none() {
info!(
context,
"Not processing deleted or empty msg {}", server_uid
);
last_uid = Some(server_uid);
if is_deleted {
info!(context, "Not processing deleted msg {}.", request_uid);
last_uid = Some(request_uid);
continue;
}
// XXX put flags into a set and pass them to receive_imf
let context = context.clone();
let body = if let Some(body) = body {
body
} else {
info!(
context,
"Not processing message {} without a BODY.", request_uid
);
last_uid = Some(request_uid);
continue;
};
// safe, as we checked above that there is a body.
let body = body
.context("we checked that message has body right above, but it has vanished")?;
let is_seen = msg.flags().any(|flag| flag == Flag::Seen);
let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen);
let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&server_uid) {
let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) {
rfc724_mid
} else {
warn!(
error!(
context,
"No Message-ID corresponding to UID {} passed in uid_messsage_ids",
server_uid
"No Message-ID corresponding to UID {} passed in uid_messsage_ids.",
request_uid
);
continue;
};
info!(
context,
"Passing message UID {} to receive_imf().", request_uid
);
match receive_imf_inner(
&context,
context,
rfc724_mid,
body,
is_seen,
@@ -1501,22 +1534,29 @@ impl Imap {
}
}
Err(err) => {
warn!(context, "receive_imf error: {:#}", err);
warn!(context, "receive_imf error: {:#}.", err);
}
};
last_uid = Some(server_uid)
last_uid = Some(request_uid)
}
// If we don't process the whole response, IMAP client is left in a broken state where
// it will try to process the rest of response as the next response.
while msgs.next().await.is_some() {}
if count != server_uids.len() {
while fetch_responses.next().await.is_some() {}
if count != request_uids.len() {
warn!(
context,
"failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}",
"Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.",
count,
server_uids.len(),
server_uids,
sets,
request_uids.len(),
request_uids,
);
} else {
info!(
context,
"Successfully received {} UIDs.",
request_uids.len()
);
}
}