From 8ffdd55f792f2b499798c46fc838642317bea993 Mon Sep 17 00:00:00 2001 From: bjoern Date: Wed, 26 Feb 2025 15:26:19 +0100 Subject: [PATCH] sync message deletion to other devices (#6573) this PR synchronises deletion of messages across devices and adds a test for it --------- Co-authored-by: Hocuri --- deltachat-ffi/deltachat.h | 2 +- src/message.rs | 117 ++++++++++++++++++++++------------- src/message/message_tests.rs | 37 ++++++++++- src/sync.rs | 25 ++++++++ 4 files changed, 133 insertions(+), 48 deletions(-) diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index ca33f0444..57f32658e 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -1963,7 +1963,7 @@ char* dc_get_mime_headers (dc_context_t* context, uint32_t ms /** - * Delete messages. The messages are deleted on the current device and + * Delete messages. The messages are deleted on all devices and * on the IMAP server. * * @memberof dc_context_t diff --git a/src/message.rs b/src/message.rs index 963128ad4..bfee5d772 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,6 +1,7 @@ //! # Messages and their identifiers. use std::collections::BTreeSet; +use std::collections::HashSet; use std::path::{Path, PathBuf}; use std::str; @@ -31,6 +32,7 @@ use crate::pgp::split_armored_data; use crate::reaction::get_msg_reactions; use crate::sql; use crate::summary::Summary; +use crate::sync::SyncData; use crate::tools::{ buf_compress, buf_decompress, get_filebytes, get_filemeta, gm2local_offset, read_file, sanitize_filename, time, timestamp_to_str, truncate, @@ -1651,35 +1653,80 @@ pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result Result<()> { + if msg.location_id > 0 { + delete_poi_location(context, msg.location_id).await?; + } + let on_server = true; + msg.id + .trash(context, on_server) + .await + .with_context(|| format!("Unable to trash message {}", msg.id))?; + + context.emit_event(EventType::MsgDeleted { + chat_id: msg.chat_id, + msg_id: msg.id, + }); + + if msg.viewtype == Viewtype::Webxdc { + context.emit_event(EventType::WebxdcInstanceDeleted { msg_id: msg.id }); + } + + let logging_xdc_id = context + .debug_logging + .read() + .expect("RwLock is poisoned") + .as_ref() + .map(|dl| dl.msg_id); + if let Some(id) = logging_xdc_id { + if id == msg.id { + set_debug_logging_xdc(context, None).await?; + } + } + + Ok(()) +} + +/// Do final events and jobs after batch deletion using calls to delete_msg_locally(). +/// To avoid additional database queries, collecting data is up to the caller. +pub(crate) async fn delete_msgs_locally_done( + context: &Context, + msg_ids: &[MsgId], + modified_chat_ids: HashSet, +) -> Result<()> { + for modified_chat_id in modified_chat_ids { + context.emit_msgs_changed_without_msg_id(modified_chat_id); + chatlist_events::emit_chatlist_item_changed(context, modified_chat_id); + } + if !msg_ids.is_empty() { + context.emit_msgs_changed_without_ids(); + chatlist_events::emit_chatlist_changed(context); + // Run housekeeping to delete unused blobs. + context + .set_config_internal(Config::LastHousekeeping, None) + .await?; + } + Ok(()) +} + /// Deletes requested messages /// by moving them to the trash chat /// and scheduling for deletion on IMAP. pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { - let mut modified_chat_ids = BTreeSet::new(); + let mut modified_chat_ids = HashSet::new(); + let mut deleted_rfc724_mid = Vec::new(); let mut res = Ok(()); for &msg_id in msg_ids { let msg = Message::load_from_db(context, msg_id).await?; - if msg.location_id > 0 { - delete_poi_location(context, msg.location_id).await?; - } - let on_server = true; - msg_id - .trash(context, on_server) - .await - .with_context(|| format!("Unable to trash message {msg_id}"))?; - - context.emit_event(EventType::MsgDeleted { - chat_id: msg.chat_id, - msg_id, - }); - - if msg.viewtype == Viewtype::Webxdc { - context.emit_event(EventType::WebxdcInstanceDeleted { msg_id }); - } + delete_msg_locally(context, &msg).await?; modified_chat_ids.insert(msg.chat_id); + deleted_rfc724_mid.push(msg.rfc724_mid.clone()); + let target = context.get_delete_msgs_target().await?; let update_db = |trans: &mut rusqlite::Transaction| { trans.execute( @@ -1694,38 +1741,20 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { res = Err(e); continue; } - - let logging_xdc_id = context - .debug_logging - .read() - .expect("RwLock is poisoned") - .as_ref() - .map(|dl| dl.msg_id); - - if let Some(id) = logging_xdc_id { - if id == msg_id { - set_debug_logging_xdc(context, None).await?; - } - } } res?; - for modified_chat_id in modified_chat_ids { - context.emit_msgs_changed_without_msg_id(modified_chat_id); - chatlist_events::emit_chatlist_item_changed(context, modified_chat_id); - } + delete_msgs_locally_done(context, msg_ids, modified_chat_ids).await?; - if !msg_ids.is_empty() { - context.emit_msgs_changed_without_ids(); - chatlist_events::emit_chatlist_changed(context); - // Run housekeeping to delete unused blobs. - context - .set_config_internal(Config::LastHousekeeping, None) - .await?; - } + context + .add_sync_item(SyncData::DeleteMessages { + msgs: deleted_rfc724_mid, + }) + .await?; - // Interrupt Inbox loop to start message deletion and run housekeeping. + // Interrupt Inbox loop to start message deletion, run housekeeping and call send_sync_msg(). context.scheduler.interrupt_inbox().await; + Ok(()) } diff --git a/src/message/message_tests.rs b/src/message/message_tests.rs index 27a2f6953..708cac527 100644 --- a/src/message/message_tests.rs +++ b/src/message/message_tests.rs @@ -9,7 +9,7 @@ use crate::chatlist::Chatlist; use crate::config::Config; use crate::reaction::send_reaction; use crate::receive_imf::receive_imf; -use crate::test_utils as test; +use crate::test_utils; use crate::test_utils::{TestContext, TestContextManager}; #[test] @@ -106,7 +106,7 @@ async fn test_create_webrtc_instance_noroom() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_width_height() { - let t = test::TestContext::new().await; + let t = TestContext::new().await; // test that get_width() and get_height() are returning some dimensions for images; // (as the device-chat contains a welcome-images, we check that) @@ -136,7 +136,7 @@ async fn test_get_width_height() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_quote() { - let d = test::TestContext::new().await; + let d = TestContext::new().await; let ctx = &d.ctx; ctx.set_config(Config::ConfiguredAddr, Some("self@example.com")) @@ -756,6 +756,37 @@ async fn test_delete_msgs_offline() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_delete_msgs_sync() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let alice2 = &tcm.alice().await; + let bob = &tcm.bob().await; + let alice_chat_id = alice.create_chat(bob).await.id; + + alice.set_config_bool(Config::SyncMsgs, true).await?; + alice2.set_config_bool(Config::SyncMsgs, true).await?; + bob.set_config_bool(Config::SyncMsgs, true).await?; + + // Alice sends a messsage and receives it on the other device + let sent1 = alice.send_text(alice_chat_id, "foo").await; + assert_eq!(alice_chat_id.get_msg_cnt(alice).await?, 1); + + let msg = alice2.recv_msg(&sent1).await; + let alice2_chat_id = msg.chat_id; + assert_eq!(alice2.get_last_msg_in(alice2_chat_id).await.id, msg.id); + assert_eq!(alice2_chat_id.get_msg_cnt(alice2).await?, 1); + + // Alice deletes the message; this should happen on both devices as well + delete_msgs(alice, &[sent1.sender_msg_id]).await?; + assert_eq!(alice_chat_id.get_msg_cnt(alice).await?, 0); + + test_utils::sync(alice, alice2).await; + assert_eq!(alice2_chat_id.get_msg_cnt(alice2).await?, 0); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_sanitize_filename_message() -> Result<()> { let t = &TestContext::new().await; diff --git a/src/sync.rs b/src/sync.rs index ba8139b77..68ab50283 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,6 +17,7 @@ use crate::sync::SyncData::{AddQrToken, AlterChat, DeleteQrToken}; use crate::token::Namespace; use crate::tools::time; use crate::{message, stock_str, token}; +use std::collections::HashSet; /// Whether to send device sync messages. Aimed for usage in the internal API. #[derive(Debug, PartialEq)] @@ -66,6 +67,9 @@ pub(crate) enum SyncData { src: String, // RFC724 id (i.e. "Message-Id" header) dest: String, // RFC724 id (i.e. "Message-Id" header) }, + DeleteMessages { + msgs: Vec, // RFC724 id (i.e. "Message-Id" header) + }, } #[derive(Debug, Serialize, Deserialize)] @@ -258,6 +262,7 @@ impl Context { AlterChat { id, action } => self.sync_alter_chat(id, action).await, SyncData::Config { key, val } => self.sync_config(key, val).await, SyncData::SaveMessage { src, dest } => self.save_message(src, dest).await, + SyncData::DeleteMessages { msgs } => self.sync_message_deletion(msgs).await, }, SyncDataOrUnknown::Unknown(data) => { warn!(self, "Ignored unknown sync item: {data}."); @@ -297,6 +302,26 @@ impl Context { } Ok(()) } + + async fn sync_message_deletion(&self, msgs: &Vec) -> Result<()> { + let mut modified_chat_ids = HashSet::new(); + let mut msg_ids = Vec::new(); + for rfc724_mid in msgs { + if let Some((msg_id, _)) = message::rfc724_mid_exists(self, rfc724_mid).await? { + if let Some(msg) = Message::load_from_db_optional(self, msg_id).await? { + message::delete_msg_locally(self, &msg).await?; + msg_ids.push(msg.id); + modified_chat_ids.insert(msg.chat_id); + } else { + warn!(self, "Sync message delete: Database entry does not exist."); + } + } else { + warn!(self, "Sync message delete: {rfc724_mid:?} not found."); + } + } + message::delete_msgs_locally_done(self, &msg_ids, modified_chat_ids).await?; + Ok(()) + } } #[cfg(test)]