From 970222f376734a23e221cd67a9a46880df95d9e1 Mon Sep 17 00:00:00 2001 From: Hocuri Date: Tue, 21 Apr 2026 22:34:53 +0200 Subject: [PATCH] feat: Resend the last 10 messages to new broadcast member (#8151) Last 10 messages in a broadcast channel are resent. They are sent and encrypted only to the new member, not to other subscribers. Close #7678 Based on https://github.com/chatmail/core/pull/7854, with the following changes: - Refactor and simplify code, don't reuse the existing `get_chat_msgs()` function - Document that Param::Arg4 is also used for resent messages cc818d9099bc747247250b614f1d61f83b236f3f - It's unclear how exactly to resend webxdc status updates. After discussing with @r10s, don't resend webxdc's at all for now. 38d57ebb30be92b8b422f685a66f8e727bd8951d - Don't set fake `msg_id` in resent messages e7d0687d907536bb9a46278cb4fd121c86faa2e4 Setting the msg_id to `u32::MAX` is hacky, and may just as well break things as it may fix things, because some code may use the msg.id to load information from the database, like `get_iroh_topic_for_msg()`. From reading the code, I couldn't find any problem with leaving the correct `msg_id`, and if there is one, then we should add a function parameter `is_resending` that is checked in the corresponding places. Easiest to review file-by-file rather than individual commits, probably. I'll squash-merge this. --------- Co-authored-by: iequidoo --- src/chat.rs | 79 ++++++++++++++++++++++++++++++++++++++---- src/chat/chat_tests.rs | 52 ++++++++++++++++++++++++++- src/constants.rs | 3 ++ src/mimefactory.rs | 23 ++++++------ src/param.rs | 4 +++ src/test_utils.rs | 45 +++++++++++++++--------- 6 files changed, 170 insertions(+), 36 deletions(-) diff --git a/src/chat.rs b/src/chat.rs index 541e25dd5..966a041ca 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -23,8 +23,9 @@ use crate::chatlist_events; use crate::color::str_to_color; use crate::config::Config; use crate::constants::{ - Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, DC_CHAT_ID_LAST_SPECIAL, - DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX, TIMESTAMP_SENT_TOLERANCE, + self, Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, + DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX, + TIMESTAMP_SENT_TOLERANCE, }; use crate::contact::{self, Contact, ContactId, Origin}; use crate::context::Context; @@ -34,7 +35,7 @@ use crate::download::{ }; use crate::ephemeral::{Timer as EphemeralTimer, start_chat_ephemeral_timers}; use crate::events::EventType; -use crate::key::self_fingerprint; +use crate::key::{Fingerprint, self_fingerprint}; use crate::location; use crate::log::{LogExt, warn}; use crate::logged_debug_assert; @@ -3125,7 +3126,8 @@ pub async fn get_chat_msgs(context: &Context, chat_id: ChatId) -> Result Result<()> { + let msgs: Vec = context + .sql + .query_map_vec( + " +SELECT id +FROM msgs +WHERE chat_id=? + AND hidden=0 + AND NOT ( -- Exclude info and system messages + param GLOB '*\nS=*' OR param GLOB 'S=*' + OR from_id=? + OR to_id=? + ) + AND type!=? +ORDER BY timestamp DESC, id DESC LIMIT ?", + ( + chat_id, + ContactId::INFO, + ContactId::INFO, + Viewtype::Webxdc, + constants::N_MSGS_TO_NEW_BROADCAST_MEMBER, + ), + |row: &rusqlite::Row| Ok(row.get::<_, MsgId>(0)?), + ) + .await? + .into_iter() + .rev() + .collect(); + resend_msgs_ex(context, &msgs, to_contact.fingerprint()).await +} + /// Returns true if an avatar should be attached in the given chat. /// /// This function does not check if the avatar is set. @@ -4675,10 +4716,26 @@ pub(crate) async fn save_copy_in_self_talk( Ok(msg.rfc724_mid) } -/// Resends given messages with the same Message-ID. +/// Resends given messages to members of the corresponding chats. /// /// This is primarily intended to make existing webxdcs available to new chat members. pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { + resend_msgs_ex(context, msg_ids, None).await +} + +/// Resends given messages to a contact with fingerprint `to_fingerprint` or, if it's `None`, to +/// members of the corresponding chats. +/// +/// NB: Actually `to_fingerprint` is only passed for `OutBroadcast` chats when a new member is +/// added. Regarding webxdcs: It is not trivial to resend only the own status updates, +/// and it is not trivial to resend them only to the newly-joined member, +/// so that for now, [`resend_last_msgs`] does not automatically resend webxdcs at all. +pub(crate) async fn resend_msgs_ex( + context: &Context, + msg_ids: &[MsgId], + to_fingerprint: Option, +) -> Result<()> { + let to_fingerprint = to_fingerprint.map(|f| f.hex()); let mut msgs: Vec = Vec::new(); for msg_id in msg_ids { let msg = Message::load_from_db(context, *msg_id).await?; @@ -4697,10 +4754,17 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { | MessageState::OutFailed | MessageState::OutDelivered | MessageState::OutMdnRcvd => { - message::update_msg_state(context, msg.id, MessageState::OutPending).await? + // Broadcast owners shouldn't see spinners on messages being auto-re-sent to new + // subscribers (otherwise big channel owners will see spinners most of the time). + if to_fingerprint.is_none() { + message::update_msg_state(context, msg.id, MessageState::OutPending).await?; + } } msg_state => bail!("Unexpected message state {msg_state}"), } + if let Some(to_fingerprint) = &to_fingerprint { + msg.param.set(Param::Arg4, to_fingerprint.clone()); + } if create_send_msg_jobs(context, &mut msg).await?.is_empty() { continue; } @@ -4712,7 +4776,8 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { chat_id: msg.chat_id, msg_id: msg.id, }); - // note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it) + // The event only matters if the message is last in the chat. + // But it's probably too expensive check, and UIs anyways need to debounce. chatlist_events::emit_chatlist_item_changed(context, msg.chat_id); if msg.viewtype == Viewtype::Webxdc { diff --git a/src/chat/chat_tests.rs b/src/chat/chat_tests.rs index 4d0f3f3ec..b36f97490 100644 --- a/src/chat/chat_tests.rs +++ b/src/chat/chat_tests.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use super::*; use crate::Event; use crate::chatlist::get_archived_cnt; -use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS}; +use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS, N_MSGS_TO_NEW_BROADCAST_MEMBER}; use crate::ephemeral::Timer; use crate::headerdef::HeaderDef; use crate::imex::{ImexMode, has_backup, imex}; @@ -2956,6 +2956,56 @@ async fn test_broadcast_change_name() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_broadcast_resend_to_new_member() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = &tcm.alice().await; + let bob = &tcm.bob().await; + let fiona = &tcm.fiona().await; + + let alice_bc_id = create_broadcast(alice, "Channel".to_string()).await?; + let qr = get_securejoin_qr(alice, Some(alice_bc_id)).await.unwrap(); + + tcm.exec_securejoin_qr(bob, alice, &qr).await; + let mut alice_msg_ids = Vec::new(); + for i in 0..(N_MSGS_TO_NEW_BROADCAST_MEMBER + 1) { + alice_msg_ids.push( + alice + .send_text(alice_bc_id, &i.to_string()) + .await + .sender_msg_id, + ); + } + let fiona_bc_id = tcm.exec_securejoin_qr(fiona, alice, &qr).await; + for msg_id in alice_msg_ids { + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered); + } + for i in 0..N_MSGS_TO_NEW_BROADCAST_MEMBER { + let rev_order = false; + let resent_msg = alice + .pop_sent_msg_ex(rev_order, Duration::ZERO) + .await + .unwrap(); + let fiona_msg = fiona.recv_msg(&resent_msg).await; + assert_eq!(fiona_msg.chat_id, fiona_bc_id); + assert_eq!(fiona_msg.text, (i + 1).to_string()); + assert!(resent_msg.recipients.contains("fiona@example.net")); + assert!(!resent_msg.recipients.contains("bob@")); + // The message is undecryptable for Bob, he mustn't be able to know yet that somebody joined + // the broadcast even if he is a postman in this land. E.g. Fiona may leave after fetching + // the news, Bob won't know about that. + assert!( + MimeMessage::from_bytes(bob, resent_msg.payload().as_bytes()) + .await? + .decryption_error + .is_some() + ); + bob.recv_msg_trash(&resent_msg).await; + } + assert!(alice.pop_sent_msg_opt(Duration::ZERO).await.is_none()); + Ok(()) +} + /// - Alice has multiple devices /// - Alice creates a broadcast and sends a message into it /// - Alice's second device sees the broadcast diff --git a/src/constants.rs b/src/constants.rs index fe6d11c56..fe3a50028 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -244,6 +244,9 @@ Here is what to do: If you have any questions, please send an email to delta@merlinux.eu or ask at https://support.delta.chat/."#; +/// How many recent messages should be re-sent to a new broadcast member. +pub(crate) const N_MSGS_TO_NEW_BROADCAST_MEMBER: usize = 10; + #[cfg(test)] mod tests { use num_traits::FromPrimitive; diff --git a/src/mimefactory.rs b/src/mimefactory.rs index cbb33d956..3a07a750e 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -194,6 +194,7 @@ fn new_address_with_name(name: &str, address: String) -> Address<'static> { } impl MimeFactory { + /// Returns `MimeFactory` for rendering `msg`. #[expect(clippy::arithmetic_side_effects)] pub async fn from_msg(context: &Context, msg: Message) -> Result { let now = time(); @@ -2226,18 +2227,18 @@ fn should_encrypt_symmetrically(msg: &Message, chat: &Chat) -> bool { /// rather than all recipients. /// This function returns the fingerprint of the recipient the message should be sent to. fn must_have_only_one_recipient<'a>(msg: &'a Message, chat: &Chat) -> Option> { - if chat.typ == Chattype::OutBroadcast - && matches!( - msg.param.get_cmd(), - SystemMessage::MemberRemovedFromGroup | SystemMessage::MemberAddedToGroup - ) - { - let Some(fp) = msg.param.get(Param::Arg4) else { - return Some(Err(format_err!("Missing removed/added member"))); - }; - return Some(Ok(fp)); + if chat.typ != Chattype::OutBroadcast { + None + } else if let Some(fp) = msg.param.get(Param::Arg4) { + Some(Ok(fp)) + } else if matches!( + msg.param.get_cmd(), + SystemMessage::MemberRemovedFromGroup | SystemMessage::MemberAddedToGroup + ) { + Some(Err(format_err!("Missing removed/added member"))) + } else { + None } - None } async fn build_body_file(context: &Context, msg: &Message) -> Result> { diff --git a/src/param.rs b/src/param.rs index dfd48d31f..087bba93e 100644 --- a/src/param.rs +++ b/src/param.rs @@ -136,6 +136,10 @@ pub enum Param { /// For "MemberAddedToGroup" and "MemberRemovedFromGroup", /// this is the fingerprint added to / removed from the group. /// + /// For messages resent when adding a new member to a broadcast channel, + /// this is the fingerprint of the added member; + /// the message must only be sent to this one member then. + /// /// For call messages, this is the end timsetamp. Arg4 = b'H', diff --git a/src/test_utils.rs b/src/test_utils.rs index 1329dcc51..d946d7a7c 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -275,16 +275,17 @@ impl TestContextManager { let chat_id = join_securejoin(&joiner.ctx, qr).await.unwrap(); - loop { + for _ in 0..2 { let mut something_sent = false; - if let Some(sent) = joiner.pop_sent_msg_opt(Duration::ZERO).await { + let rev_order = false; + if let Some(sent) = joiner.pop_sent_msg_ex(rev_order, Duration::ZERO).await { for inviter in inviters { inviter.recv_msg_opt(&sent).await; } something_sent = true; } for inviter in inviters { - if let Some(sent) = inviter.pop_sent_msg_opt(Duration::ZERO).await { + if let Some(sent) = inviter.pop_sent_msg_ex(rev_order, Duration::ZERO).await { joiner.recv_msg_opt(&sent).await; something_sent = true; } @@ -623,25 +624,35 @@ impl TestContext { } pub async fn pop_sent_msg_opt(&self, timeout: Duration) -> Option> { + let rev_order = true; + self.pop_sent_msg_ex(rev_order, timeout).await + } + + pub async fn pop_sent_msg_ex( + &self, + rev_order: bool, + timeout: Duration, + ) -> Option> { let start = Instant::now(); + let mut query = " +SELECT id, msg_id, mime, recipients +FROM smtp +ORDER BY id" + .to_string(); + if rev_order { + query += " DESC"; + } let (rowid, msg_id, payload, recipients) = loop { let row = self .ctx .sql - .query_row_optional( - r#" - SELECT id, msg_id, mime, recipients - FROM smtp - ORDER BY id DESC"#, - (), - |row| { - let rowid: i64 = row.get(0)?; - let msg_id: MsgId = row.get(1)?; - let mime: String = row.get(2)?; - let recipients: String = row.get(3)?; - Ok((rowid, msg_id, mime, recipients)) - }, - ) + .query_row_optional(&query, (), |row| { + let rowid: i64 = row.get(0)?; + let msg_id: MsgId = row.get(1)?; + let mime: String = row.get(2)?; + let recipients: String = row.get(3)?; + Ok((rowid, msg_id, mime, recipients)) + }) .await .expect("query_row_optional failed"); if let Some(row) = row {