From 3743aaa16e86fd249df7e074327405fbcecaefe6 Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 3 Dec 2022 23:12:32 +0000 Subject: [PATCH] Refactor fetch_many_msgs and add more logging --- CHANGELOG.md | 1 + src/imap.rs | 184 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 113 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b110d64b4..e3b291385 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Changes - Refactor: Remove the remaining AsRef #3669 +- Add more logging to `fetch_many_msgs` and refactor it #3811 - Small speedup #3780 - Log the reason when the message cannot be sent to the chat #3810 diff --git a/src/imap.rs b/src/imap.rs index f3501539b..139853b86 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1374,22 +1374,27 @@ impl Imap { &mut self, context: &Context, folder: &str, - server_uids: Vec, + request_uids: Vec, uid_message_ids: &BTreeMap, fetch_partially: bool, fetching_existing_messages: bool, ) -> Result<(Option, Vec)> { + let mut last_uid = None; let mut received_msgs = Vec::new(); - if server_uids.is_empty() { - return Ok((None, Vec::new())); + + if request_uids.is_empty() { + return Ok((last_uid, received_msgs)); } let session = self.session.as_mut().context("no IMAP session")?; - let sets = build_sequence_sets(&server_uids)?; - let mut last_uid = None; - - for (server_uids, set) in sets.iter() { - let mut msgs = match session + for (request_uids, set) in build_sequence_sets(&request_uids)? { + info!( + context, + "Starting a {} FETCH of message set \"{}\".", + if fetch_partially { "partial" } else { "full" }, + set + ); + let mut fetch_responses = match session .uid_fetch( &set, if fetch_partially { @@ -1399,94 +1404,122 @@ impl Imap { }, ) .await + .with_context(|| format!("fetching messages {} from folder \"{}\"", &set, folder)) { - Ok(msgs) => msgs, + Ok(fetch_responses) => fetch_responses, Err(err) => { - // TODO: maybe differentiate between IO and input/parsing problems - // so we don't reconnect if we have a (rare) input/output parsing problem? + // We want to reconnect regardless of whether it's an I/O error or parsing + // error. If the protocol parser ends up in incorrect state because of some + // incompatiblity with a server, reset may help. self.should_reconnect = true; - bail!( - "Error on fetching messages #{} from folder \"{}\"; error={}.", - &set, - folder, - err - ); + return Err(err); } }; - let mut uid_msgs = server_uids - .iter() - .map(|&uid| (uid, None)) - .collect::>(); - let mut server_uids_it = server_uids.iter().peekable(); + // Map from UIDs to unprocessed FETCH results. We put unprocessed FETCH results here + // when we want to process other messages first. + let mut uid_msgs = HashMap::with_capacity(request_uids.len()); + let mut count = 0; - while let Some(&&server_uid) = server_uids_it.peek() { - let mut msg = uid_msgs.insert(server_uid, None).flatten(); - while msg.is_none() { - let msg_unwrapped = match msgs.next().await { - Some(Ok(msg)) => msg, - Some(Err(_)) => continue, - None => break, + for &request_uid in request_uids.iter() { + // Check if FETCH response is already in `uid_msgs`. + let mut fetch_response = uid_msgs.remove(&request_uid); + + // Try to find a requsted UID in returned FETCH responses. + while fetch_response.is_none() { + let next_fetch_response = + if let Some(next_fetch_response) = fetch_responses.next().await { + next_fetch_response + } else { + // No more FETCH responses received from the server. + break; + }; + + let next_fetch_response = match next_fetch_response { + Ok(next_fetch_response) => next_fetch_response, + Err(err) => { + warn!(context, "Failed to process IMAP FETCH result: {}.", err); + continue; + } }; - let msg_uid = msg_unwrapped.uid.unwrap_or_default(); - if !uid_msgs.contains_key(&msg_uid) { - // Unwanted UIDs are possible because of unsolicited responses, e.g. if - // another client changes \Seen flag on a message after we do a prefetch but - // before fetch. It's not an error if we receive such unsolicited response. - continue; - } - msg = Some(msg_unwrapped); - if msg_uid != server_uid && uid_msgs.insert(msg_uid, msg.take()).is_some() { - warn!(context, "Got duplicated UID {}", msg_uid); + + if let Some(next_uid) = next_fetch_response.uid { + if next_uid == request_uid { + fetch_response = Some(next_fetch_response); + } else if !request_uids.contains(&next_uid) { + // (size of `request_uids` is bounded by IMAP command length limit, + // search in this vector is always fast) + + // Unwanted UIDs are possible because of unsolicited responses, e.g. if + // another client changes \Seen flag on a message after we do a prefetch but + // before fetch. It's not an error if we receive such unsolicited response. + info!( + context, + "Skipping not requested FETCH response for UID {}.", next_uid + ); + } else if uid_msgs.insert(next_uid, next_fetch_response).is_some() { + warn!(context, "Got duplicated UID {}.", next_uid); + } + } else { + info!(context, "Skipping FETCH response without UID."); } } - let msg = match msg { - Some(msg) => msg, + + let fetch_response = match fetch_response { + Some(fetch) => fetch, None => { - warn!(context, "Missed UID {} in the server response", server_uid); - server_uids_it.next(); + warn!( + context, + "Missed UID {} in the server response.", request_uid + ); continue; } }; - server_uids_it.next(); count += 1; - let is_deleted = msg.flags().any(|flag| flag == Flag::Deleted); + let is_deleted = fetch_response.flags().any(|flag| flag == Flag::Deleted); let (body, partial) = if fetch_partially { - (msg.header(), msg.size) // `BODY.PEEK[HEADER]` goes to header() ... + (fetch_response.header(), fetch_response.size) // `BODY.PEEK[HEADER]` goes to header() ... } else { - (msg.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header() + (fetch_response.body(), None) // ... while `BODY.PEEK[]` goes to body() - and includes header() }; - if is_deleted || body.is_none() { - info!( - context, - "Not processing deleted or empty msg {}", server_uid - ); - last_uid = Some(server_uid); + if is_deleted { + info!(context, "Not processing deleted msg {}.", request_uid); + last_uid = Some(request_uid); continue; } - // XXX put flags into a set and pass them to receive_imf - let context = context.clone(); + let body = if let Some(body) = body { + body + } else { + info!( + context, + "Not processing message {} without a BODY.", request_uid + ); + last_uid = Some(request_uid); + continue; + }; - // safe, as we checked above that there is a body. - let body = body - .context("we checked that message has body right above, but it has vanished")?; - let is_seen = msg.flags().any(|flag| flag == Flag::Seen); + let is_seen = fetch_response.flags().any(|flag| flag == Flag::Seen); - let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&server_uid) { + let rfc724_mid = if let Some(rfc724_mid) = uid_message_ids.get(&request_uid) { rfc724_mid } else { - warn!( + error!( context, - "No Message-ID corresponding to UID {} passed in uid_messsage_ids", - server_uid + "No Message-ID corresponding to UID {} passed in uid_messsage_ids.", + request_uid ); continue; }; + + info!( + context, + "Passing message UID {} to receive_imf().", request_uid + ); match receive_imf_inner( - &context, + context, rfc724_mid, body, is_seen, @@ -1501,22 +1534,29 @@ impl Imap { } } Err(err) => { - warn!(context, "receive_imf error: {:#}", err); + warn!(context, "receive_imf error: {:#}.", err); } }; - last_uid = Some(server_uid) + last_uid = Some(request_uid) } + // If we don't process the whole response, IMAP client is left in a broken state where // it will try to process the rest of response as the next response. - while msgs.next().await.is_some() {} - if count != server_uids.len() { + while fetch_responses.next().await.is_some() {} + + if count != request_uids.len() { warn!( context, - "failed to fetch all uids: got {}, requested {}, we requested the UIDs {:?} using {:?}", + "Failed to fetch all UIDs: got {}, requested {}, we requested the UIDs {:?}.", count, - server_uids.len(), - server_uids, - sets, + request_uids.len(), + request_uids, + ); + } else { + info!( + context, + "Successfully received {} UIDs.", + request_uids.len() ); } }