mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 17:36:29 +03:00
sync message deletion to other devices (#6573)
this PR synchronises deletion of messages across devices and adds a test for it --------- Co-authored-by: Hocuri <hocuri@gmx.de>
This commit is contained in:
@@ -1963,7 +1963,7 @@ char* dc_get_mime_headers (dc_context_t* context, uint32_t ms
|
|||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete messages. The messages are deleted on the current device and
|
* Delete messages. The messages are deleted on all devices and
|
||||||
* on the IMAP server.
|
* on the IMAP server.
|
||||||
*
|
*
|
||||||
* @memberof dc_context_t
|
* @memberof dc_context_t
|
||||||
|
|||||||
117
src/message.rs
117
src/message.rs
@@ -1,6 +1,7 @@
|
|||||||
//! # Messages and their identifiers.
|
//! # Messages and their identifiers.
|
||||||
|
|
||||||
use std::collections::BTreeSet;
|
use std::collections::BTreeSet;
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::str;
|
use std::str;
|
||||||
|
|
||||||
@@ -31,6 +32,7 @@ use crate::pgp::split_armored_data;
|
|||||||
use crate::reaction::get_msg_reactions;
|
use crate::reaction::get_msg_reactions;
|
||||||
use crate::sql;
|
use crate::sql;
|
||||||
use crate::summary::Summary;
|
use crate::summary::Summary;
|
||||||
|
use crate::sync::SyncData;
|
||||||
use crate::tools::{
|
use crate::tools::{
|
||||||
buf_compress, buf_decompress, get_filebytes, get_filemeta, gm2local_offset, read_file,
|
buf_compress, buf_decompress, get_filebytes, get_filemeta, gm2local_offset, read_file,
|
||||||
sanitize_filename, time, timestamp_to_str, truncate,
|
sanitize_filename, time, timestamp_to_str, truncate,
|
||||||
@@ -1651,35 +1653,80 @@ pub async fn get_mime_headers(context: &Context, msg_id: MsgId) -> Result<Vec<u8
|
|||||||
Ok(headers)
|
Ok(headers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete a single message from the database, including references in other tables.
|
||||||
|
/// This may be called in batches; the final events are emitted in delete_msgs_locally_done() then.
|
||||||
|
pub(crate) async fn delete_msg_locally(context: &Context, msg: &Message) -> Result<()> {
|
||||||
|
if msg.location_id > 0 {
|
||||||
|
delete_poi_location(context, msg.location_id).await?;
|
||||||
|
}
|
||||||
|
let on_server = true;
|
||||||
|
msg.id
|
||||||
|
.trash(context, on_server)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("Unable to trash message {}", msg.id))?;
|
||||||
|
|
||||||
|
context.emit_event(EventType::MsgDeleted {
|
||||||
|
chat_id: msg.chat_id,
|
||||||
|
msg_id: msg.id,
|
||||||
|
});
|
||||||
|
|
||||||
|
if msg.viewtype == Viewtype::Webxdc {
|
||||||
|
context.emit_event(EventType::WebxdcInstanceDeleted { msg_id: msg.id });
|
||||||
|
}
|
||||||
|
|
||||||
|
let logging_xdc_id = context
|
||||||
|
.debug_logging
|
||||||
|
.read()
|
||||||
|
.expect("RwLock is poisoned")
|
||||||
|
.as_ref()
|
||||||
|
.map(|dl| dl.msg_id);
|
||||||
|
if let Some(id) = logging_xdc_id {
|
||||||
|
if id == msg.id {
|
||||||
|
set_debug_logging_xdc(context, None).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Do final events and jobs after batch deletion using calls to delete_msg_locally().
|
||||||
|
/// To avoid additional database queries, collecting data is up to the caller.
|
||||||
|
pub(crate) async fn delete_msgs_locally_done(
|
||||||
|
context: &Context,
|
||||||
|
msg_ids: &[MsgId],
|
||||||
|
modified_chat_ids: HashSet<ChatId>,
|
||||||
|
) -> Result<()> {
|
||||||
|
for modified_chat_id in modified_chat_ids {
|
||||||
|
context.emit_msgs_changed_without_msg_id(modified_chat_id);
|
||||||
|
chatlist_events::emit_chatlist_item_changed(context, modified_chat_id);
|
||||||
|
}
|
||||||
|
if !msg_ids.is_empty() {
|
||||||
|
context.emit_msgs_changed_without_ids();
|
||||||
|
chatlist_events::emit_chatlist_changed(context);
|
||||||
|
// Run housekeeping to delete unused blobs.
|
||||||
|
context
|
||||||
|
.set_config_internal(Config::LastHousekeeping, None)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Deletes requested messages
|
/// Deletes requested messages
|
||||||
/// by moving them to the trash chat
|
/// by moving them to the trash chat
|
||||||
/// and scheduling for deletion on IMAP.
|
/// and scheduling for deletion on IMAP.
|
||||||
pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||||
let mut modified_chat_ids = BTreeSet::new();
|
let mut modified_chat_ids = HashSet::new();
|
||||||
|
let mut deleted_rfc724_mid = Vec::new();
|
||||||
let mut res = Ok(());
|
let mut res = Ok(());
|
||||||
|
|
||||||
for &msg_id in msg_ids {
|
for &msg_id in msg_ids {
|
||||||
let msg = Message::load_from_db(context, msg_id).await?;
|
let msg = Message::load_from_db(context, msg_id).await?;
|
||||||
if msg.location_id > 0 {
|
delete_msg_locally(context, &msg).await?;
|
||||||
delete_poi_location(context, msg.location_id).await?;
|
|
||||||
}
|
|
||||||
let on_server = true;
|
|
||||||
msg_id
|
|
||||||
.trash(context, on_server)
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("Unable to trash message {msg_id}"))?;
|
|
||||||
|
|
||||||
context.emit_event(EventType::MsgDeleted {
|
|
||||||
chat_id: msg.chat_id,
|
|
||||||
msg_id,
|
|
||||||
});
|
|
||||||
|
|
||||||
if msg.viewtype == Viewtype::Webxdc {
|
|
||||||
context.emit_event(EventType::WebxdcInstanceDeleted { msg_id });
|
|
||||||
}
|
|
||||||
|
|
||||||
modified_chat_ids.insert(msg.chat_id);
|
modified_chat_ids.insert(msg.chat_id);
|
||||||
|
|
||||||
|
deleted_rfc724_mid.push(msg.rfc724_mid.clone());
|
||||||
|
|
||||||
let target = context.get_delete_msgs_target().await?;
|
let target = context.get_delete_msgs_target().await?;
|
||||||
let update_db = |trans: &mut rusqlite::Transaction| {
|
let update_db = |trans: &mut rusqlite::Transaction| {
|
||||||
trans.execute(
|
trans.execute(
|
||||||
@@ -1694,38 +1741,20 @@ pub async fn delete_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
|||||||
res = Err(e);
|
res = Err(e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let logging_xdc_id = context
|
|
||||||
.debug_logging
|
|
||||||
.read()
|
|
||||||
.expect("RwLock is poisoned")
|
|
||||||
.as_ref()
|
|
||||||
.map(|dl| dl.msg_id);
|
|
||||||
|
|
||||||
if let Some(id) = logging_xdc_id {
|
|
||||||
if id == msg_id {
|
|
||||||
set_debug_logging_xdc(context, None).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
res?;
|
res?;
|
||||||
|
|
||||||
for modified_chat_id in modified_chat_ids {
|
delete_msgs_locally_done(context, msg_ids, modified_chat_ids).await?;
|
||||||
context.emit_msgs_changed_without_msg_id(modified_chat_id);
|
|
||||||
chatlist_events::emit_chatlist_item_changed(context, modified_chat_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
if !msg_ids.is_empty() {
|
context
|
||||||
context.emit_msgs_changed_without_ids();
|
.add_sync_item(SyncData::DeleteMessages {
|
||||||
chatlist_events::emit_chatlist_changed(context);
|
msgs: deleted_rfc724_mid,
|
||||||
// Run housekeeping to delete unused blobs.
|
})
|
||||||
context
|
.await?;
|
||||||
.set_config_internal(Config::LastHousekeeping, None)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Interrupt Inbox loop to start message deletion and run housekeeping.
|
// Interrupt Inbox loop to start message deletion, run housekeeping and call send_sync_msg().
|
||||||
context.scheduler.interrupt_inbox().await;
|
context.scheduler.interrupt_inbox().await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use crate::chatlist::Chatlist;
|
|||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::reaction::send_reaction;
|
use crate::reaction::send_reaction;
|
||||||
use crate::receive_imf::receive_imf;
|
use crate::receive_imf::receive_imf;
|
||||||
use crate::test_utils as test;
|
use crate::test_utils;
|
||||||
use crate::test_utils::{TestContext, TestContextManager};
|
use crate::test_utils::{TestContext, TestContextManager};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -106,7 +106,7 @@ async fn test_create_webrtc_instance_noroom() {
|
|||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn test_get_width_height() {
|
async fn test_get_width_height() {
|
||||||
let t = test::TestContext::new().await;
|
let t = TestContext::new().await;
|
||||||
|
|
||||||
// test that get_width() and get_height() are returning some dimensions for images;
|
// test that get_width() and get_height() are returning some dimensions for images;
|
||||||
// (as the device-chat contains a welcome-images, we check that)
|
// (as the device-chat contains a welcome-images, we check that)
|
||||||
@@ -136,7 +136,7 @@ async fn test_get_width_height() {
|
|||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn test_quote() {
|
async fn test_quote() {
|
||||||
let d = test::TestContext::new().await;
|
let d = TestContext::new().await;
|
||||||
let ctx = &d.ctx;
|
let ctx = &d.ctx;
|
||||||
|
|
||||||
ctx.set_config(Config::ConfiguredAddr, Some("self@example.com"))
|
ctx.set_config(Config::ConfiguredAddr, Some("self@example.com"))
|
||||||
@@ -756,6 +756,37 @@ async fn test_delete_msgs_offline() -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_delete_msgs_sync() -> Result<()> {
|
||||||
|
let mut tcm = TestContextManager::new();
|
||||||
|
let alice = &tcm.alice().await;
|
||||||
|
let alice2 = &tcm.alice().await;
|
||||||
|
let bob = &tcm.bob().await;
|
||||||
|
let alice_chat_id = alice.create_chat(bob).await.id;
|
||||||
|
|
||||||
|
alice.set_config_bool(Config::SyncMsgs, true).await?;
|
||||||
|
alice2.set_config_bool(Config::SyncMsgs, true).await?;
|
||||||
|
bob.set_config_bool(Config::SyncMsgs, true).await?;
|
||||||
|
|
||||||
|
// Alice sends a messsage and receives it on the other device
|
||||||
|
let sent1 = alice.send_text(alice_chat_id, "foo").await;
|
||||||
|
assert_eq!(alice_chat_id.get_msg_cnt(alice).await?, 1);
|
||||||
|
|
||||||
|
let msg = alice2.recv_msg(&sent1).await;
|
||||||
|
let alice2_chat_id = msg.chat_id;
|
||||||
|
assert_eq!(alice2.get_last_msg_in(alice2_chat_id).await.id, msg.id);
|
||||||
|
assert_eq!(alice2_chat_id.get_msg_cnt(alice2).await?, 1);
|
||||||
|
|
||||||
|
// Alice deletes the message; this should happen on both devices as well
|
||||||
|
delete_msgs(alice, &[sent1.sender_msg_id]).await?;
|
||||||
|
assert_eq!(alice_chat_id.get_msg_cnt(alice).await?, 0);
|
||||||
|
|
||||||
|
test_utils::sync(alice, alice2).await;
|
||||||
|
assert_eq!(alice2_chat_id.get_msg_cnt(alice2).await?, 0);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn test_sanitize_filename_message() -> Result<()> {
|
async fn test_sanitize_filename_message() -> Result<()> {
|
||||||
let t = &TestContext::new().await;
|
let t = &TestContext::new().await;
|
||||||
|
|||||||
25
src/sync.rs
25
src/sync.rs
@@ -17,6 +17,7 @@ use crate::sync::SyncData::{AddQrToken, AlterChat, DeleteQrToken};
|
|||||||
use crate::token::Namespace;
|
use crate::token::Namespace;
|
||||||
use crate::tools::time;
|
use crate::tools::time;
|
||||||
use crate::{message, stock_str, token};
|
use crate::{message, stock_str, token};
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
/// Whether to send device sync messages. Aimed for usage in the internal API.
|
/// Whether to send device sync messages. Aimed for usage in the internal API.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
@@ -66,6 +67,9 @@ pub(crate) enum SyncData {
|
|||||||
src: String, // RFC724 id (i.e. "Message-Id" header)
|
src: String, // RFC724 id (i.e. "Message-Id" header)
|
||||||
dest: String, // RFC724 id (i.e. "Message-Id" header)
|
dest: String, // RFC724 id (i.e. "Message-Id" header)
|
||||||
},
|
},
|
||||||
|
DeleteMessages {
|
||||||
|
msgs: Vec<String>, // RFC724 id (i.e. "Message-Id" header)
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@@ -258,6 +262,7 @@ impl Context {
|
|||||||
AlterChat { id, action } => self.sync_alter_chat(id, action).await,
|
AlterChat { id, action } => self.sync_alter_chat(id, action).await,
|
||||||
SyncData::Config { key, val } => self.sync_config(key, val).await,
|
SyncData::Config { key, val } => self.sync_config(key, val).await,
|
||||||
SyncData::SaveMessage { src, dest } => self.save_message(src, dest).await,
|
SyncData::SaveMessage { src, dest } => self.save_message(src, dest).await,
|
||||||
|
SyncData::DeleteMessages { msgs } => self.sync_message_deletion(msgs).await,
|
||||||
},
|
},
|
||||||
SyncDataOrUnknown::Unknown(data) => {
|
SyncDataOrUnknown::Unknown(data) => {
|
||||||
warn!(self, "Ignored unknown sync item: {data}.");
|
warn!(self, "Ignored unknown sync item: {data}.");
|
||||||
@@ -297,6 +302,26 @@ impl Context {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn sync_message_deletion(&self, msgs: &Vec<String>) -> Result<()> {
|
||||||
|
let mut modified_chat_ids = HashSet::new();
|
||||||
|
let mut msg_ids = Vec::new();
|
||||||
|
for rfc724_mid in msgs {
|
||||||
|
if let Some((msg_id, _)) = message::rfc724_mid_exists(self, rfc724_mid).await? {
|
||||||
|
if let Some(msg) = Message::load_from_db_optional(self, msg_id).await? {
|
||||||
|
message::delete_msg_locally(self, &msg).await?;
|
||||||
|
msg_ids.push(msg.id);
|
||||||
|
modified_chat_ids.insert(msg.chat_id);
|
||||||
|
} else {
|
||||||
|
warn!(self, "Sync message delete: Database entry does not exist.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!(self, "Sync message delete: {rfc724_mid:?} not found.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
message::delete_msgs_locally_done(self, &msg_ids, modified_chat_ids).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user