From 276daf631e52b0f1302bb4f2c73dc4d1f385918f Mon Sep 17 00:00:00 2001 From: link2xt Date: Sat, 5 Feb 2022 00:26:49 +0000 Subject: [PATCH] imap: move messages in batches Also change how NO response is treated. NO response means there is an error moving/copying the messages. When there are no matching messages, the response is "OK No matching messages, so nothing copied" according to some RFC 9051 examples. --- CHANGELOG.md | 2 + python/tests/test_account.py | 4 +- src/imap.rs | 214 ++++++++++++++++++++--------------- 3 files changed, 127 insertions(+), 93 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 454d24945..5634c47f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,11 @@ ### Changes - refactorings #3026 +- move messages in batches #3058 ### Fixes - avoid archived, fresh chats #3053 +- treat "NO" IMAP response to MOVE and COPY commands as an error #3058 ## 1.75.0 diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 248b71761..3e145021d 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -892,11 +892,11 @@ class TestOnlineAccount: chat = acfactory.get_accepted_chat(ac1, ac2) chat.send_text("message1") + ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") chat.send_text("message2") + ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") chat.send_text("message3") ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") - ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") - ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") def test_forward_messages(self, acfactory, lp): ac1, ac2 = acfactory.get_two_online_accounts() diff --git a/src/imap.rs b/src/imap.rs index 90cb90c68..f26b21c14 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -827,18 +827,98 @@ impl Imap { Ok(read_cnt > 0) } + /// Moves batch of messages identified by their UID from the currently + /// selected folder to the target folder. + async fn move_message_batch( + &mut self, + context: &Context, + set: &str, + row_ids: Vec, + target: &str, + ) -> Result<()> { + if self.config.can_move { + let session = self + .session + .as_mut() + .context("no session while attempting to MOVE messages")?; + match session.uid_mv(set, &target).await { + Ok(()) => { + // Messages are moved or don't exist, IMAP returns OK response in both cases. + context + .sql + .execute( + format!( + "DELETE FROM imap WHERE id IN ({})", + row_ids.iter().map(|_| "?").collect::>().join(",") + ), + rusqlite::params_from_iter(row_ids), + ) + .await + .context("cannot delete moved messages from imap table")?; + context.emit_event(EventType::ImapMessageMoved(format!( + "IMAP messages {} moved to {}", + set, target + ))); + return Ok(()); + } + Err(err) => { + warn!( + context, + "Cannot move message, fallback to COPY/DELETE {} to {}: {}", + set, + target, + err + ); + } + } + } else { + info!( + context, + "Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target + ); + } + + // Server does not support MOVE or MOVE failed. + // Copy the message to the destination folder and mark the record for deletion. + let session = self + .session + .as_mut() + .context("no session while attempting to COPY messages")?; + match session.uid_copy(&set, &target).await { + Ok(()) => { + context + .sql + .execute( + format!( + "UPDATE imap SET target='' WHERE id IN ({})", + row_ids.iter().map(|_| "?").collect::>().join(",") + ), + rusqlite::params_from_iter(row_ids), + ) + .await + .context("cannot plan deletion of copied messages")?; + context.emit_event(EventType::ImapMessageMoved(format!( + "IMAP messages {} copied to {}", + set, target + ))); + Ok(()) + } + Err(err) => Err(err.into()), + } + } + /// Moves messages. /// /// This is the only place where messages are moved on the IMAP server. async fn move_messages(&mut self, context: &Context, folder: &str) -> Result<()> { - let rows = context + let mut rows = context .sql .query_map( "SELECT id, uid, target FROM imap WHERE folder = ? AND target != folder AND target != '' -- Not planned for deletion. - ORDER BY id", + ORDER BY target, uid", paramsv![folder], |row| { let rowid: i64 = row.get(0)?; @@ -848,105 +928,57 @@ impl Imap { }, |rows| rows.collect::, _>>().map_err(Into::into), ) - .await?; + .await? + .into_iter() + .peekable(); self.prepare(context).await?; self.select_folder(context, Some(folder)).await?; - for (rowid, uid, target) in rows { - // TODO: batch moves of messages with the same destination. - let set = uid.to_string(); + while let Some((_, _, target)) = rows.peek().cloned() { + // Construct next request for the target folder. + let mut uid_set = String::new(); + let mut rowid_set = Vec::new(); - if self.config.can_move { - if let Some(session) = &mut self.session { - match session.uid_mv(&set, &target).await { - Ok(_) => { - context.emit_event(EventType::ImapMessageMoved(format!( - "IMAP message {}/{} moved to {}", - folder, uid, target - ))); - context - .sql - .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) - .await?; - continue; - } - Err(async_imap::error::Error::No(text)) => { - // "NO" response, probably the message is moved already. - info!( - context, - "IMAP message {}/{} cannot be moved: {}", folder, uid, text - ); - context - .sql - .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) - .await?; - continue; - } - Err(err) => { - warn!( - context, - "Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}", - folder, - uid, - target, - err - ); - } + while uid_set.len() < 1000 { + // Construct a new range. + if let Some((start_rowid, start_uid, _)) = + rows.next_if(|(_, _, start_target)| start_target == &target) + { + rowid_set.push(start_rowid); + let mut end_uid = start_uid; + + while let Some((next_rowid, next_uid, _)) = + rows.next_if(|(_, next_uid, next_target)| { + next_target == &target && *next_uid == end_uid + 1 + }) + { + end_uid = next_uid; + rowid_set.push(next_rowid); } + + let uid_range = UidRange { + start: start_uid, + end: end_uid, + }; + if !uid_set.is_empty() { + uid_set.push(','); + } + uid_set.push_str(&uid_range.to_string()); } else { - bail!("No session while attempting to move the message"); + break; } - } else { - info!( - context, - "Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}", - folder, - uid, - target - ); } - // Server does not support MOVE or MOVE failed. - // Copy the message to the destination folder and mark the record for deletion. - if let Some(session) = &mut self.session { - match session.uid_copy(&set, &target).await { - Ok(_) => { - context.emit_event(EventType::ImapMessageMoved(format!( - "IMAP message {}/{} copied to {}", - folder, uid, target - ))); - // Plan deletion of the original message. - context - .sql - .execute("UPDATE imap SET target='' WHERE id=?", paramsv![rowid]) - .await?; - } - Err(async_imap::error::Error::No(text)) => { - // "NO" response, probably the message is moved already. - info!( - context, - "IMAP message {}/{} cannot be copied: {}", folder, uid, text - ); - context - .sql - .execute("DELETE FROM imap WHERE id=?", paramsv![rowid]) - .await?; - continue; - } - Err(err) => { - warn!( - context, - "Could not copy message {}/{}: {}", folder, uid, err - ); - // Break the loop to avoid moving messages out of order. - // We can't proceed until this message is moved or copied. - break; - } - } - } else { - bail!("No session while attempting to copy the message"); - } + // Execute request. + self.move_message_batch(context, &uid_set, rowid_set, &target) + .await + .with_context(|| { + format!( + "cannot move batch of messages {:?} to folder {:?}", + &uid_set, target + ) + })?; } Ok(())