feat: Resend the last 10 messages to new broadcast member (#8151)

Last 10 messages in a broadcast channel are resent. They are sent and encrypted only to the new member, not to other subscribers.

Close #7678

Based on https://github.com/chatmail/core/pull/7854, with the following
changes:

- Refactor and simplify code, don't reuse the existing `get_chat_msgs()`
function
- Document that Param::Arg4 is also used for resent messages
cc818d9099
- It's unclear how exactly to resend webxdc status updates. After
discussing with @r10s, don't resend webxdc's at all for now.
38d57ebb30
- Don't set fake `msg_id` in resent messages
e7d0687d90
Setting the msg_id to `u32::MAX` is hacky, and may just as well break
    things as it may fix things, because some code may use the msg.id to
    load information from the database, like `get_iroh_topic_for_msg()`.
    From reading the code, I couldn't find any problem with leaving the
    correct `msg_id`, and if there is one, then we should add a function
parameter `is_resending` that is checked in the corresponding places.

Easiest to review file-by-file rather than individual commits, probably.
I'll squash-merge this.

---------

Co-authored-by: iequidoo <dgreshilov@gmail.com>
This commit is contained in:
Hocuri
2026-04-21 22:34:53 +02:00
committed by GitHub
parent 83e31a5f17
commit 970222f376
6 changed files with 170 additions and 36 deletions

View File

@@ -23,8 +23,9 @@ use crate::chatlist_events;
use crate::color::str_to_color;
use crate::config::Config;
use crate::constants::{
Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, DC_CHAT_ID_LAST_SPECIAL,
DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX, TIMESTAMP_SENT_TOLERANCE,
self, Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK,
DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX,
TIMESTAMP_SENT_TOLERANCE,
};
use crate::contact::{self, Contact, ContactId, Origin};
use crate::context::Context;
@@ -34,7 +35,7 @@ use crate::download::{
};
use crate::ephemeral::{Timer as EphemeralTimer, start_chat_ephemeral_timers};
use crate::events::EventType;
use crate::key::self_fingerprint;
use crate::key::{Fingerprint, self_fingerprint};
use crate::location;
use crate::log::{LogExt, warn};
use crate::logged_debug_assert;
@@ -3125,7 +3126,8 @@ pub async fn get_chat_msgs(context: &Context, chat_id: ChatId) -> Result<Vec<Cha
.await
}
/// Returns messages belonging to the chat according to the given options.
/// Returns messages belonging to the chat according to the given options,
/// sorted by oldest message first.
#[expect(clippy::arithmetic_side_effects)]
pub async fn get_chat_msgs_ex(
context: &Context,
@@ -3136,6 +3138,7 @@ pub async fn get_chat_msgs_ex(
info_only,
add_daymarker,
} = options;
// TODO: Remove `info_only` parameter; it's not used by anything
let process_row = if info_only {
|row: &rusqlite::Row| {
// is_info logic taken from Message.is_info()
@@ -4009,9 +4012,47 @@ pub(crate) async fn add_contact_to_chat_ex(
if sync.into() {
chat.sync_contacts(context).await.log_err(context).ok();
}
if chat.typ == Chattype::OutBroadcast {
resend_last_msgs(context, chat.id, &contact)
.await
.log_err(context)
.ok();
}
Ok(true)
}
async fn resend_last_msgs(context: &Context, chat_id: ChatId, to_contact: &Contact) -> Result<()> {
let msgs: Vec<MsgId> = context
.sql
.query_map_vec(
"
SELECT id
FROM msgs
WHERE chat_id=?
AND hidden=0
AND NOT ( -- Exclude info and system messages
param GLOB '*\nS=*' OR param GLOB 'S=*'
OR from_id=?
OR to_id=?
)
AND type!=?
ORDER BY timestamp DESC, id DESC LIMIT ?",
(
chat_id,
ContactId::INFO,
ContactId::INFO,
Viewtype::Webxdc,
constants::N_MSGS_TO_NEW_BROADCAST_MEMBER,
),
|row: &rusqlite::Row| Ok(row.get::<_, MsgId>(0)?),
)
.await?
.into_iter()
.rev()
.collect();
resend_msgs_ex(context, &msgs, to_contact.fingerprint()).await
}
/// Returns true if an avatar should be attached in the given chat.
///
/// This function does not check if the avatar is set.
@@ -4675,10 +4716,26 @@ pub(crate) async fn save_copy_in_self_talk(
Ok(msg.rfc724_mid)
}
/// Resends given messages with the same Message-ID.
/// Resends given messages to members of the corresponding chats.
///
/// This is primarily intended to make existing webxdcs available to new chat members.
pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
resend_msgs_ex(context, msg_ids, None).await
}
/// Resends given messages to a contact with fingerprint `to_fingerprint` or, if it's `None`, to
/// members of the corresponding chats.
///
/// NB: Actually `to_fingerprint` is only passed for `OutBroadcast` chats when a new member is
/// added. Regarding webxdcs: It is not trivial to resend only the own status updates,
/// and it is not trivial to resend them only to the newly-joined member,
/// so that for now, [`resend_last_msgs`] does not automatically resend webxdcs at all.
pub(crate) async fn resend_msgs_ex(
context: &Context,
msg_ids: &[MsgId],
to_fingerprint: Option<Fingerprint>,
) -> Result<()> {
let to_fingerprint = to_fingerprint.map(|f| f.hex());
let mut msgs: Vec<Message> = Vec::new();
for msg_id in msg_ids {
let msg = Message::load_from_db(context, *msg_id).await?;
@@ -4697,10 +4754,17 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
| MessageState::OutFailed
| MessageState::OutDelivered
| MessageState::OutMdnRcvd => {
message::update_msg_state(context, msg.id, MessageState::OutPending).await?
// Broadcast owners shouldn't see spinners on messages being auto-re-sent to new
// subscribers (otherwise big channel owners will see spinners most of the time).
if to_fingerprint.is_none() {
message::update_msg_state(context, msg.id, MessageState::OutPending).await?;
}
}
msg_state => bail!("Unexpected message state {msg_state}"),
}
if let Some(to_fingerprint) = &to_fingerprint {
msg.param.set(Param::Arg4, to_fingerprint.clone());
}
if create_send_msg_jobs(context, &mut msg).await?.is_empty() {
continue;
}
@@ -4712,7 +4776,8 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
chat_id: msg.chat_id,
msg_id: msg.id,
});
// note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it)
// The event only matters if the message is last in the chat.
// But it's probably too expensive check, and UIs anyways need to debounce.
chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
if msg.viewtype == Viewtype::Webxdc {

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use super::*;
use crate::Event;
use crate::chatlist::get_archived_cnt;
use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS};
use crate::constants::{DC_GCL_ARCHIVED_ONLY, DC_GCL_NO_SPECIALS, N_MSGS_TO_NEW_BROADCAST_MEMBER};
use crate::ephemeral::Timer;
use crate::headerdef::HeaderDef;
use crate::imex::{ImexMode, has_backup, imex};
@@ -2956,6 +2956,56 @@ async fn test_broadcast_change_name() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_broadcast_resend_to_new_member() -> Result<()> {
let mut tcm = TestContextManager::new();
let alice = &tcm.alice().await;
let bob = &tcm.bob().await;
let fiona = &tcm.fiona().await;
let alice_bc_id = create_broadcast(alice, "Channel".to_string()).await?;
let qr = get_securejoin_qr(alice, Some(alice_bc_id)).await.unwrap();
tcm.exec_securejoin_qr(bob, alice, &qr).await;
let mut alice_msg_ids = Vec::new();
for i in 0..(N_MSGS_TO_NEW_BROADCAST_MEMBER + 1) {
alice_msg_ids.push(
alice
.send_text(alice_bc_id, &i.to_string())
.await
.sender_msg_id,
);
}
let fiona_bc_id = tcm.exec_securejoin_qr(fiona, alice, &qr).await;
for msg_id in alice_msg_ids {
assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered);
}
for i in 0..N_MSGS_TO_NEW_BROADCAST_MEMBER {
let rev_order = false;
let resent_msg = alice
.pop_sent_msg_ex(rev_order, Duration::ZERO)
.await
.unwrap();
let fiona_msg = fiona.recv_msg(&resent_msg).await;
assert_eq!(fiona_msg.chat_id, fiona_bc_id);
assert_eq!(fiona_msg.text, (i + 1).to_string());
assert!(resent_msg.recipients.contains("fiona@example.net"));
assert!(!resent_msg.recipients.contains("bob@"));
// The message is undecryptable for Bob, he mustn't be able to know yet that somebody joined
// the broadcast even if he is a postman in this land. E.g. Fiona may leave after fetching
// the news, Bob won't know about that.
assert!(
MimeMessage::from_bytes(bob, resent_msg.payload().as_bytes())
.await?
.decryption_error
.is_some()
);
bob.recv_msg_trash(&resent_msg).await;
}
assert!(alice.pop_sent_msg_opt(Duration::ZERO).await.is_none());
Ok(())
}
/// - Alice has multiple devices
/// - Alice creates a broadcast and sends a message into it
/// - Alice's second device sees the broadcast

View File

@@ -244,6 +244,9 @@ Here is what to do:
If you have any questions, please send an email to delta@merlinux.eu or ask at https://support.delta.chat/."#;
/// How many recent messages should be re-sent to a new broadcast member.
pub(crate) const N_MSGS_TO_NEW_BROADCAST_MEMBER: usize = 10;
#[cfg(test)]
mod tests {
use num_traits::FromPrimitive;

View File

@@ -194,6 +194,7 @@ fn new_address_with_name(name: &str, address: String) -> Address<'static> {
}
impl MimeFactory {
/// Returns `MimeFactory` for rendering `msg`.
#[expect(clippy::arithmetic_side_effects)]
pub async fn from_msg(context: &Context, msg: Message) -> Result<MimeFactory> {
let now = time();
@@ -2226,18 +2227,18 @@ fn should_encrypt_symmetrically(msg: &Message, chat: &Chat) -> bool {
/// rather than all recipients.
/// This function returns the fingerprint of the recipient the message should be sent to.
fn must_have_only_one_recipient<'a>(msg: &'a Message, chat: &Chat) -> Option<Result<&'a str>> {
if chat.typ == Chattype::OutBroadcast
&& matches!(
if chat.typ != Chattype::OutBroadcast {
None
} else if let Some(fp) = msg.param.get(Param::Arg4) {
Some(Ok(fp))
} else if matches!(
msg.param.get_cmd(),
SystemMessage::MemberRemovedFromGroup | SystemMessage::MemberAddedToGroup
)
{
let Some(fp) = msg.param.get(Param::Arg4) else {
return Some(Err(format_err!("Missing removed/added member")));
};
return Some(Ok(fp));
}
) {
Some(Err(format_err!("Missing removed/added member")))
} else {
None
}
}
async fn build_body_file(context: &Context, msg: &Message) -> Result<MimePart<'static>> {

View File

@@ -136,6 +136,10 @@ pub enum Param {
/// For "MemberAddedToGroup" and "MemberRemovedFromGroup",
/// this is the fingerprint added to / removed from the group.
///
/// For messages resent when adding a new member to a broadcast channel,
/// this is the fingerprint of the added member;
/// the message must only be sent to this one member then.
///
/// For call messages, this is the end timsetamp.
Arg4 = b'H',

View File

@@ -275,16 +275,17 @@ impl TestContextManager {
let chat_id = join_securejoin(&joiner.ctx, qr).await.unwrap();
loop {
for _ in 0..2 {
let mut something_sent = false;
if let Some(sent) = joiner.pop_sent_msg_opt(Duration::ZERO).await {
let rev_order = false;
if let Some(sent) = joiner.pop_sent_msg_ex(rev_order, Duration::ZERO).await {
for inviter in inviters {
inviter.recv_msg_opt(&sent).await;
}
something_sent = true;
}
for inviter in inviters {
if let Some(sent) = inviter.pop_sent_msg_opt(Duration::ZERO).await {
if let Some(sent) = inviter.pop_sent_msg_ex(rev_order, Duration::ZERO).await {
joiner.recv_msg_opt(&sent).await;
something_sent = true;
}
@@ -623,25 +624,35 @@ impl TestContext {
}
pub async fn pop_sent_msg_opt(&self, timeout: Duration) -> Option<SentMessage<'_>> {
let rev_order = true;
self.pop_sent_msg_ex(rev_order, timeout).await
}
pub async fn pop_sent_msg_ex(
&self,
rev_order: bool,
timeout: Duration,
) -> Option<SentMessage<'_>> {
let start = Instant::now();
let mut query = "
SELECT id, msg_id, mime, recipients
FROM smtp
ORDER BY id"
.to_string();
if rev_order {
query += " DESC";
}
let (rowid, msg_id, payload, recipients) = loop {
let row = self
.ctx
.sql
.query_row_optional(
r#"
SELECT id, msg_id, mime, recipients
FROM smtp
ORDER BY id DESC"#,
(),
|row| {
.query_row_optional(&query, (), |row| {
let rowid: i64 = row.get(0)?;
let msg_id: MsgId = row.get(1)?;
let mime: String = row.get(2)?;
let recipients: String = row.get(3)?;
Ok((rowid, msg_id, mime, recipients))
},
)
})
.await
.expect("query_row_optional failed");
if let Some(row) = row {