Add imap table to keep track of message UIDs

`imap` table maps Message-IDs to UIDs on the server. `dc_receive_imf`
no longer gets the UID of the message as an argument and does not
insert the folder and UID of the message into the `msgs`
table. `server_folder` and `server_uid` columns in `msgs` table are
deprecated.

MoveMsg and DeleteMsgOnImap jobs are removed. Now messages are moved
and deleted only in the `fetch_move_delete` procedure that consults
the `target` column of the `imap` table to determine where the message
should go.

Where the message should go is determined after prefetching by the
`imap::target_folder()` procedure.  Messages are only downloaded once
they reach their target folder to avoid race conditions in multidevice
setting, such as:

1. One device trying to FETCH the message while the other tries to
MOVE it.

2. One device marking the message as \Seen in the Inbox while the
other has already copied unseen message to the Movebox and is going to
delete the \Seen message in the Inbox.

3. Device downloads the message from the Inbox while there are newer
messages in the Movebox placed there by the other device, thus
processing the messages out of order.
This commit is contained in:
link2xt
2021-12-19 00:00:00 +00:00
parent 0b810d7d65
commit 12823c2213
22 changed files with 1057 additions and 1329 deletions

View File

@@ -52,7 +52,7 @@
//! `MsgsChanged` event is emitted when a message deletion is due, to
//! make UI reload displayed messages and cause actual deletion.
//!
//! Server deletion happens by generating IMAP deletion jobs based on
//! Server deletion happens by updating the `imap` table based on
//! the database entries which are expired either according to their
//! ephemeral message timers or global `delete_server_after` setting.
@@ -73,7 +73,6 @@ use crate::context::Context;
use crate::dc_tools::time;
use crate::download::MIN_DELETE_SERVER_AFTER;
use crate::events::EventType;
use crate::job;
use crate::message::{Message, MessageState, MsgId};
use crate::mimeparser::SystemMessage;
use crate::stock_str;
@@ -263,7 +262,7 @@ pub(crate) async fn stock_ephemeral_timer_changed(
impl MsgId {
/// Returns ephemeral message timer value for the message.
pub(crate) async fn ephemeral_timer(self, context: &Context) -> anyhow::Result<Timer> {
pub(crate) async fn ephemeral_timer(self, context: &Context) -> Result<Timer> {
let res = match context
.sql
.query_get_value(
@@ -279,7 +278,7 @@ impl MsgId {
}
/// Starts ephemeral message timer for the message if it is not started yet.
pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> anyhow::Result<()> {
pub(crate) async fn start_ephemeral_timer(self, context: &Context) -> Result<()> {
if let Timer::Enabled { duration } = self.ephemeral_timer(context).await? {
let ephemeral_timestamp = time().saturating_add(duration.into());
@@ -434,11 +433,8 @@ pub async fn schedule_ephemeral_task(context: &Context) {
}
}
/// Returns ID of any expired message that should be deleted from the server.
///
/// It looks up the trash chat too, to find messages that are already
/// deleted locally, but not deleted on the server.
pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Result<Option<MsgId>> {
/// Schedules expired IMAP messages for deletion.
pub(crate) async fn delete_expired_imap_messages(context: &Context) -> Result<()> {
let now = time();
let (threshold_timestamp, threshold_timestamp_extended) =
@@ -452,27 +448,21 @@ pub(crate) async fn load_imap_deletion_msgid(context: &Context) -> anyhow::Resul
context
.sql
.query_row_optional(
"SELECT id FROM msgs \
WHERE ( \
((download_state = 0 AND timestamp < ?) OR (download_state != 0 AND timestamp < ?)) \
OR (ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?) \
) \
AND server_uid != 0 \
AND NOT id IN (SELECT foreign_id FROM jobs WHERE action = ?)
LIMIT 1",
paramsv![
threshold_timestamp,
threshold_timestamp_extended,
now,
job::Action::DeleteMsgOnImap
],
|row| {
let msg_id: MsgId = row.get(0)?;
Ok(msg_id)
},
.execute(
"UPDATE imap
SET target=''
WHERE EXISTS (
SELECT * FROM msgs
WHERE rfc724_mid=imap.rfc724_mid
AND ((download_state = 0 AND timestamp < ?) OR
(download_state != 0 AND timestamp < ?) OR
(ephemeral_timestamp != 0 AND ephemeral_timestamp <= ?))
)",
paramsv![threshold_timestamp, threshold_timestamp_extended, now],
)
.await
.await?;
Ok(())
}
/// Start ephemeral timers for seen messages if they are not started
@@ -507,9 +497,6 @@ pub(crate) async fn start_ephemeral_timers(context: &Context) -> Result<()> {
#[cfg(test)]
mod tests {
use crate::param::Params;
use async_std::task::sleep;
use super::*;
use crate::config::Config;
use crate::dc_receive_imf::dc_receive_imf;
@@ -725,7 +712,7 @@ mod tests {
/// Test that Alice replying to the chat without a timer at the same time as Bob enables the
/// timer does not result in disabling the timer on the Bob's side.
#[async_std::test]
async fn test_ephemeral_timer_rollback() -> anyhow::Result<()> {
async fn test_ephemeral_timer_rollback() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
@@ -799,14 +786,14 @@ mod tests {
}
#[async_std::test]
async fn test_ephemeral_delete_msgs() {
async fn test_ephemeral_delete_msgs() -> Result<()> {
let t = TestContext::new_alice().await;
let chat = t.get_self_chat().await;
t.send_text(chat.id, "Saved message, which we delete manually")
.await;
let msg = t.get_last_msg_in(chat.id).await;
msg.id.delete_from_db(&t).await.unwrap();
msg.id.delete_from_db(&t).await?;
check_msg_was_deleted(&t, &chat, msg.id).await;
chat.id
@@ -817,36 +804,12 @@ mod tests {
.send_text(chat.id, "Saved message, disappearing after 1s")
.await;
sleep(Duration::from_millis(1100)).await;
async_std::task::sleep(Duration::from_millis(1100)).await;
// Check checks that the msg was deleted locally
// Check that the msg was deleted locally.
check_msg_was_deleted(&t, &chat, msg.sender_msg_id).await;
// Check that the msg will be deleted on the server
// First of all, set a server_uid so that DC thinks that it's actually possible to delete
t.sql
.execute(
"UPDATE msgs SET server_uid=1 WHERE id=?",
paramsv![msg.sender_msg_id],
)
.await
.unwrap();
let job = job::load_imap_deletion_job(&t).await.unwrap();
assert_eq!(
job,
Some(job::Job::new(
job::Action::DeleteMsgOnImap,
msg.sender_msg_id.to_u32(),
Params::new(),
0,
))
);
// Let's assume that executing the job fails on first try and the job is saved to the db
job.unwrap().save(&t).await.unwrap();
// Make sure that we don't get yet another job when loading from db
let job2 = job::load_imap_deletion_job(&t).await.unwrap();
assert_eq!(job2, None);
Ok(())
}
async fn check_msg_was_deleted(t: &TestContext, chat: &Chat, msg_id: MsgId) {
@@ -874,7 +837,7 @@ mod tests {
}
#[async_std::test]
async fn test_load_imap_deletion_msgid() -> Result<()> {
async fn test_delete_expired_imap_messages() -> Result<()> {
let t = TestContext::new_alice().await;
const HOUR: i64 = 60 * 60;
let now = time();
@@ -887,42 +850,98 @@ mod tests {
(2000, now - 18 * HOUR, now - HOUR),
(2020, now - 17 * HOUR, now + HOUR),
] {
let message_id = id.to_string();
t.sql
.execute(
"INSERT INTO msgs (id, server_uid, timestamp, ephemeral_timestamp) VALUES (?,?,?,?);",
paramsv![id, id, timestamp, ephemeral_timestamp],
)
.await?;
.execute(
"INSERT INTO msgs (id, rfc724_mid, timestamp, ephemeral_timestamp) VALUES (?,?,?,?);",
paramsv![id, message_id, timestamp, ephemeral_timestamp],
)
.await?;
t.sql
.execute(
"INSERT INTO imap (rfc724_mid, folder, uid, target) VALUES (?,'INBOX',?, 'INBOX');",
paramsv![message_id, id],
)
.await?;
}
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(2000)));
async fn test_marked_for_deletion(context: &Context, id: u32) -> Result<()> {
assert_eq!(
context
.sql
.count(
"SELECT COUNT(*) FROM imap WHERE target='' AND rfc724_mid=?",
paramsv![id.to_string()],
)
.await?,
1
);
Ok(())
}
MsgId::new(2000).delete_from_db(&t).await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None);
async fn remove_uid(context: &Context, id: u32) -> Result<()> {
context
.sql
.execute(
"DELETE FROM imap WHERE rfc724_mid=?",
paramsv![id.to_string()],
)
.await?;
Ok(())
}
// This should mark message 2000 for deletion.
delete_expired_imap_messages(&t).await?;
test_marked_for_deletion(&t, 2000).await?;
remove_uid(&t, 2000).await?;
// No other messages are marked for deletion.
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM imap WHERE target=''", paramsv![],)
.await?,
0
);
t.set_config(Config::DeleteServerAfter, Some(&*(25 * HOUR).to_string()))
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000)));
delete_expired_imap_messages(&t).await?;
test_marked_for_deletion(&t, 1000).await?;
MsgId::new(1000)
.update_download_state(&t, DownloadState::Available)
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1000))); // delete downloadable anyway
MsgId::new(1000).delete_from_db(&t).await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None);
t.sql
.execute(
"UPDATE imap SET target=folder WHERE rfc724_mid='1000'",
paramsv![],
)
.await?;
delete_expired_imap_messages(&t).await?;
test_marked_for_deletion(&t, 1000).await?; // Delete downloadable anyway.
remove_uid(&t, 1000).await?;
t.set_config(Config::DeleteServerAfter, Some(&*(22 * HOUR).to_string()))
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, Some(MsgId::new(1010)));
delete_expired_imap_messages(&t).await?;
test_marked_for_deletion(&t, 1010).await?;
t.sql
.execute(
"UPDATE imap SET target=folder WHERE rfc724_mid='1010'",
paramsv![],
)
.await?;
MsgId::new(1010)
.update_download_state(&t, DownloadState::Available)
.await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None); // keep downloadable for now
MsgId::new(1010).delete_from_db(&t).await?;
assert_eq!(load_imap_deletion_msgid(&t).await?, None);
delete_expired_imap_messages(&t).await?;
// Keep downloadable for now.
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM imap WHERE target=''", paramsv![],)
.await?,
0
);
Ok(())
}
@@ -944,7 +963,6 @@ mod tests {
\n\
hello\n",
"INBOX",
1,
false,
)
.await?;
@@ -966,7 +984,6 @@ mod tests {
\n\
second message\n",
"INBOX",
2,
false,
)
.await?;
@@ -1004,7 +1021,6 @@ mod tests {
\n\
> hello\n",
"INBOX",
3,
false,
)
.await?;