imap: move messages in batches

Also change how NO response is treated. NO response means there is an
error moving/copying the messages. When there are no matching
messages, the response is "OK No matching messages, so nothing copied"
according to some RFC 9051 examples.
This commit is contained in:
link2xt
2022-02-05 00:26:49 +00:00
parent fb19b58147
commit 276daf631e
3 changed files with 127 additions and 93 deletions

View File

@@ -4,9 +4,11 @@
### Changes
- refactorings #3026
- move messages in batches #3058
### Fixes
- avoid archived, fresh chats #3053
- treat "NO" IMAP response to MOVE and COPY commands as an error #3058
## 1.75.0

View File

@@ -892,11 +892,11 @@ class TestOnlineAccount:
chat = acfactory.get_accepted_chat(ac1, ac2)
chat.send_text("message1")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
chat.send_text("message2")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
chat.send_text("message3")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED")
def test_forward_messages(self, acfactory, lp):
ac1, ac2 = acfactory.get_two_online_accounts()

View File

@@ -827,18 +827,98 @@ impl Imap {
Ok(read_cnt > 0)
}
/// Moves batch of messages identified by their UID from the currently
/// selected folder to the target folder.
async fn move_message_batch(
&mut self,
context: &Context,
set: &str,
row_ids: Vec<i64>,
target: &str,
) -> Result<()> {
if self.config.can_move {
let session = self
.session
.as_mut()
.context("no session while attempting to MOVE messages")?;
match session.uid_mv(set, &target).await {
Ok(()) => {
// Messages are moved or don't exist, IMAP returns OK response in both cases.
context
.sql
.execute(
format!(
"DELETE FROM imap WHERE id IN ({})",
row_ids.iter().map(|_| "?").collect::<Vec<&str>>().join(",")
),
rusqlite::params_from_iter(row_ids),
)
.await
.context("cannot delete moved messages from imap table")?;
context.emit_event(EventType::ImapMessageMoved(format!(
"IMAP messages {} moved to {}",
set, target
)));
return Ok(());
}
Err(err) => {
warn!(
context,
"Cannot move message, fallback to COPY/DELETE {} to {}: {}",
set,
target,
err
);
}
}
} else {
info!(
context,
"Server does not support MOVE, fallback to COPY/DELETE {} to {}", set, target
);
}
// Server does not support MOVE or MOVE failed.
// Copy the message to the destination folder and mark the record for deletion.
let session = self
.session
.as_mut()
.context("no session while attempting to COPY messages")?;
match session.uid_copy(&set, &target).await {
Ok(()) => {
context
.sql
.execute(
format!(
"UPDATE imap SET target='' WHERE id IN ({})",
row_ids.iter().map(|_| "?").collect::<Vec<&str>>().join(",")
),
rusqlite::params_from_iter(row_ids),
)
.await
.context("cannot plan deletion of copied messages")?;
context.emit_event(EventType::ImapMessageMoved(format!(
"IMAP messages {} copied to {}",
set, target
)));
Ok(())
}
Err(err) => Err(err.into()),
}
}
/// Moves messages.
///
/// This is the only place where messages are moved on the IMAP server.
async fn move_messages(&mut self, context: &Context, folder: &str) -> Result<()> {
let rows = context
let mut rows = context
.sql
.query_map(
"SELECT id, uid, target FROM imap
WHERE folder = ?
AND target != folder
AND target != '' -- Not planned for deletion.
ORDER BY id",
ORDER BY target, uid",
paramsv![folder],
|row| {
let rowid: i64 = row.get(0)?;
@@ -848,105 +928,57 @@ impl Imap {
},
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
)
.await?;
.await?
.into_iter()
.peekable();
self.prepare(context).await?;
self.select_folder(context, Some(folder)).await?;
for (rowid, uid, target) in rows {
// TODO: batch moves of messages with the same destination.
let set = uid.to_string();
while let Some((_, _, target)) = rows.peek().cloned() {
// Construct next request for the target folder.
let mut uid_set = String::new();
let mut rowid_set = Vec::new();
if self.config.can_move {
if let Some(session) = &mut self.session {
match session.uid_mv(&set, &target).await {
Ok(_) => {
context.emit_event(EventType::ImapMessageMoved(format!(
"IMAP message {}/{} moved to {}",
folder, uid, target
)));
context
.sql
.execute("DELETE FROM imap WHERE id=?", paramsv![rowid])
.await?;
continue;
}
Err(async_imap::error::Error::No(text)) => {
// "NO" response, probably the message is moved already.
info!(
context,
"IMAP message {}/{} cannot be moved: {}", folder, uid, text
);
context
.sql
.execute("DELETE FROM imap WHERE id=?", paramsv![rowid])
.await?;
continue;
}
Err(err) => {
warn!(
context,
"Cannot move message, fallback to COPY/DELETE {}/{} to {}: {}",
folder,
uid,
target,
err
);
}
while uid_set.len() < 1000 {
// Construct a new range.
if let Some((start_rowid, start_uid, _)) =
rows.next_if(|(_, _, start_target)| start_target == &target)
{
rowid_set.push(start_rowid);
let mut end_uid = start_uid;
while let Some((next_rowid, next_uid, _)) =
rows.next_if(|(_, next_uid, next_target)| {
next_target == &target && *next_uid == end_uid + 1
})
{
end_uid = next_uid;
rowid_set.push(next_rowid);
}
let uid_range = UidRange {
start: start_uid,
end: end_uid,
};
if !uid_set.is_empty() {
uid_set.push(',');
}
uid_set.push_str(&uid_range.to_string());
} else {
bail!("No session while attempting to move the message");
break;
}
} else {
info!(
context,
"Server does not support MOVE, fallback to COPY/DELETE {}/{} to {}",
folder,
uid,
target
);
}
// Server does not support MOVE or MOVE failed.
// Copy the message to the destination folder and mark the record for deletion.
if let Some(session) = &mut self.session {
match session.uid_copy(&set, &target).await {
Ok(_) => {
context.emit_event(EventType::ImapMessageMoved(format!(
"IMAP message {}/{} copied to {}",
folder, uid, target
)));
// Plan deletion of the original message.
context
.sql
.execute("UPDATE imap SET target='' WHERE id=?", paramsv![rowid])
.await?;
}
Err(async_imap::error::Error::No(text)) => {
// "NO" response, probably the message is moved already.
info!(
context,
"IMAP message {}/{} cannot be copied: {}", folder, uid, text
);
context
.sql
.execute("DELETE FROM imap WHERE id=?", paramsv![rowid])
.await?;
continue;
}
Err(err) => {
warn!(
context,
"Could not copy message {}/{}: {}", folder, uid, err
);
// Break the loop to avoid moving messages out of order.
// We can't proceed until this message is moved or copied.
break;
}
}
} else {
bail!("No session while attempting to copy the message");
}
// Execute request.
self.move_message_batch(context, &uid_set, rowid_set, &target)
.await
.with_context(|| {
format!(
"cannot move batch of messages {:?} to folder {:?}",
&uid_set, target
)
})?;
}
Ok(())