mirror of
https://github.com/chatmail/core.git
synced 2026-04-25 17:36:30 +03:00
feat: Resend the last 10 messages to new broadcast member (#7678)
Messages are sent and encrypted only to the new member. This way we at least postpone spreading the information that the new member joined: even if the server operator is a broadcast member, they can't know that immediately.
This commit is contained in:
200
src/chat.rs
200
src/chat.rs
@@ -23,8 +23,9 @@ use crate::chatlist_events;
|
||||
use crate::color::str_to_color;
|
||||
use crate::config::Config;
|
||||
use crate::constants::{
|
||||
Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, DC_CHAT_ID_LAST_SPECIAL,
|
||||
DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX, TIMESTAMP_SENT_TOLERANCE,
|
||||
self, Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK,
|
||||
DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, EDITED_PREFIX,
|
||||
TIMESTAMP_SENT_TOLERANCE,
|
||||
};
|
||||
use crate::contact::{self, Contact, ContactId, Origin};
|
||||
use crate::context::Context;
|
||||
@@ -34,7 +35,7 @@ use crate::download::{
|
||||
};
|
||||
use crate::ephemeral::{Timer as EphemeralTimer, start_chat_ephemeral_timers};
|
||||
use crate::events::EventType;
|
||||
use crate::key::self_fingerprint;
|
||||
use crate::key::{Fingerprint, self_fingerprint};
|
||||
use crate::location;
|
||||
use crate::log::{LogExt, warn};
|
||||
use crate::logged_debug_assert;
|
||||
@@ -2753,7 +2754,7 @@ async fn prepare_send_msg(
|
||||
}
|
||||
chat.prepare_msg_raw(context, msg, update_msg_id).await?;
|
||||
|
||||
let row_ids = create_send_msg_jobs(context, msg)
|
||||
let row_ids = create_send_msg_jobs(context, msg, None)
|
||||
.await
|
||||
.context("Failed to create send jobs")?;
|
||||
if !row_ids.is_empty() {
|
||||
@@ -2823,7 +2824,14 @@ async fn render_mime_message_and_pre_message(
|
||||
/// Returns row ids if `smtp` table jobs were created or an empty `Vec` otherwise.
|
||||
///
|
||||
/// The caller has to interrupt SMTP loop or otherwise process new rows.
|
||||
pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result<Vec<i64>> {
|
||||
///
|
||||
/// * `row_id` - Actual Message ID, if `Some`. This is to avoid updating the `msgs` row, in which
|
||||
/// case `msg.id` is fake (`u32::MAX`);
|
||||
pub(crate) async fn create_send_msg_jobs(
|
||||
context: &Context,
|
||||
msg: &mut Message,
|
||||
row_id: Option<MsgId>,
|
||||
) -> Result<Vec<i64>> {
|
||||
let cmd = msg.param.get_cmd();
|
||||
if cmd == SystemMessage::GroupNameChanged || cmd == SystemMessage::GroupDescriptionChanged {
|
||||
msg.chat_id
|
||||
@@ -2840,7 +2848,7 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
|
||||
}
|
||||
|
||||
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
|
||||
let mimefactory = match MimeFactory::from_msg(context, msg.clone()).await {
|
||||
let mimefactory = match MimeFactory::from_msg(context, msg.clone(), row_id).await {
|
||||
Ok(mf) => mf,
|
||||
Err(err) => {
|
||||
// Mark message as failed
|
||||
@@ -3102,42 +3110,61 @@ async fn donation_request_maybe(context: &Context) -> Result<()> {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Chat message list request options.
|
||||
#[derive(Debug)]
|
||||
pub struct MessageListOptions {
|
||||
/// Return only info messages.
|
||||
pub info_only: bool,
|
||||
/// Controls which messages [`get_chat_msgs_ex`] returns.
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
pub enum ChatMsgsFilter {
|
||||
/// All messages.
|
||||
#[default]
|
||||
All,
|
||||
/// Info messages.
|
||||
Info,
|
||||
/// Non-info non-system messages.
|
||||
NonInfoNonSystem,
|
||||
}
|
||||
|
||||
impl ChatMsgsFilter {
|
||||
/// Returns filter capturing only info messages or all messages.
|
||||
pub fn info_only(arg: bool) -> Self {
|
||||
match arg {
|
||||
true => Self::Info,
|
||||
false => Self::All,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// [`get_chat_msgs_ex`] options.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct GetChatMsgsOptions {
|
||||
/// Which messages to return.
|
||||
pub filter: ChatMsgsFilter,
|
||||
|
||||
/// Add day markers before each date regarding the local timezone.
|
||||
pub add_daymarker: bool,
|
||||
|
||||
/// If `Some(n)`, return up to `n` last (by timestamp) messages.
|
||||
pub n_last: Option<usize>,
|
||||
}
|
||||
|
||||
/// Returns all messages belonging to the chat.
|
||||
pub async fn get_chat_msgs(context: &Context, chat_id: ChatId) -> Result<Vec<ChatItem>> {
|
||||
get_chat_msgs_ex(
|
||||
context,
|
||||
chat_id,
|
||||
MessageListOptions {
|
||||
info_only: false,
|
||||
add_daymarker: false,
|
||||
},
|
||||
)
|
||||
.await
|
||||
get_chat_msgs_ex(context, chat_id, Default::default()).await
|
||||
}
|
||||
|
||||
/// Returns messages belonging to the chat according to the given options.
|
||||
/// Older messages go first.
|
||||
#[expect(clippy::arithmetic_side_effects)]
|
||||
pub async fn get_chat_msgs_ex(
|
||||
context: &Context,
|
||||
chat_id: ChatId,
|
||||
options: MessageListOptions,
|
||||
options: GetChatMsgsOptions,
|
||||
) -> Result<Vec<ChatItem>> {
|
||||
let MessageListOptions {
|
||||
info_only,
|
||||
let GetChatMsgsOptions {
|
||||
filter,
|
||||
add_daymarker,
|
||||
n_last,
|
||||
} = options;
|
||||
let process_row = if info_only {
|
||||
|row: &rusqlite::Row| {
|
||||
let process_row = |row: &rusqlite::Row| {
|
||||
if filter != ChatMsgsFilter::All {
|
||||
// is_info logic taken from Message.is_info()
|
||||
let params = row.get::<_, String>("param")?;
|
||||
let (from_id, to_id) = (
|
||||
@@ -3157,15 +3184,13 @@ pub async fn get_chat_msgs_ex(
|
||||
Ok((
|
||||
row.get::<_, i64>("timestamp")?,
|
||||
row.get::<_, MsgId>("id")?,
|
||||
!is_info_msg,
|
||||
is_info_msg == (filter == ChatMsgsFilter::Info),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
|row: &rusqlite::Row| {
|
||||
} else {
|
||||
Ok((
|
||||
row.get::<_, i64>("timestamp")?,
|
||||
row.get::<_, MsgId>("id")?,
|
||||
false,
|
||||
true,
|
||||
))
|
||||
}
|
||||
};
|
||||
@@ -3174,8 +3199,8 @@ pub async fn get_chat_msgs_ex(
|
||||
// let sqlite execute an ORDER BY clause.
|
||||
let mut sorted_rows = Vec::new();
|
||||
for row in rows {
|
||||
let (ts, curr_id, exclude_message): (i64, MsgId, bool) = row?;
|
||||
if !exclude_message {
|
||||
let (ts, curr_id, include): (i64, MsgId, bool) = row?;
|
||||
if include {
|
||||
sorted_rows.push((ts, curr_id));
|
||||
}
|
||||
}
|
||||
@@ -3202,21 +3227,27 @@ pub async fn get_chat_msgs_ex(
|
||||
Ok(ret)
|
||||
};
|
||||
|
||||
let items = if info_only {
|
||||
let n_last_subst = match n_last {
|
||||
Some(n) => format!("ORDER BY timestamp DESC, id DESC LIMIT {n}"),
|
||||
None => "".to_string(),
|
||||
};
|
||||
let items = if filter != ChatMsgsFilter::All {
|
||||
context
|
||||
.sql
|
||||
.query_map(
|
||||
// GLOB is used here instead of LIKE because it is case-sensitive
|
||||
"SELECT m.id AS id, m.timestamp AS timestamp, m.param AS param, m.from_id AS from_id, m.to_id AS to_id
|
||||
FROM msgs m
|
||||
WHERE m.chat_id=?
|
||||
AND m.hidden=0
|
||||
AND (
|
||||
m.param GLOB '*\nS=*' OR param GLOB 'S=*'
|
||||
OR m.from_id == ?
|
||||
OR m.to_id == ?
|
||||
);",
|
||||
(chat_id, ContactId::INFO, ContactId::INFO),
|
||||
&format!("
|
||||
SELECT m.id AS id, m.timestamp AS timestamp, m.param AS param, m.from_id AS from_id, m.to_id AS to_id
|
||||
FROM msgs m
|
||||
WHERE m.chat_id=?
|
||||
AND m.hidden=0
|
||||
AND ?=(
|
||||
m.param GLOB '*\nS=*' OR param GLOB 'S=*'
|
||||
OR m.from_id == ?
|
||||
OR m.to_id == ?
|
||||
)
|
||||
{n_last_subst}"
|
||||
),
|
||||
(chat_id, filter == ChatMsgsFilter::Info, ContactId::INFO, ContactId::INFO),
|
||||
process_row,
|
||||
process_rows,
|
||||
)
|
||||
@@ -3225,10 +3256,14 @@ pub async fn get_chat_msgs_ex(
|
||||
context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT m.id AS id, m.timestamp AS timestamp
|
||||
FROM msgs m
|
||||
WHERE m.chat_id=?
|
||||
AND m.hidden=0;",
|
||||
&format!(
|
||||
"
|
||||
SELECT m.id AS id, m.timestamp AS timestamp
|
||||
FROM msgs m
|
||||
WHERE m.chat_id=?
|
||||
AND m.hidden=0
|
||||
{n_last_subst}"
|
||||
),
|
||||
(chat_id,),
|
||||
process_row,
|
||||
process_rows,
|
||||
@@ -4009,6 +4044,29 @@ pub(crate) async fn add_contact_to_chat_ex(
|
||||
if sync.into() {
|
||||
chat.sync_contacts(context).await.log_err(context).ok();
|
||||
}
|
||||
let resend_last_msgs = || async {
|
||||
let items = get_chat_msgs_ex(
|
||||
context,
|
||||
chat.id,
|
||||
GetChatMsgsOptions {
|
||||
filter: ChatMsgsFilter::NonInfoNonSystem,
|
||||
n_last: Some(constants::N_MSGS_TO_NEW_BROADCAST_MEMBER),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let msgs: Vec<_> = items
|
||||
.into_iter()
|
||||
.filter_map(|i| match i {
|
||||
ChatItem::Message { msg_id } => Some(msg_id),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
resend_msgs_ex(context, &msgs, contact.fingerprint()).await
|
||||
};
|
||||
if chat.typ == Chattype::OutBroadcast {
|
||||
resend_last_msgs().await.log_err(context).ok();
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -4566,7 +4624,10 @@ pub async fn forward_msgs_2ctx(
|
||||
chat.prepare_msg_raw(ctx_dst, &mut msg, None).await?;
|
||||
|
||||
curr_timestamp += 1;
|
||||
if !create_send_msg_jobs(ctx_dst, &mut msg).await?.is_empty() {
|
||||
if !create_send_msg_jobs(ctx_dst, &mut msg, None)
|
||||
.await?
|
||||
.is_empty()
|
||||
{
|
||||
ctx_dst.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
created_msgs.push(msg.id);
|
||||
@@ -4675,10 +4736,28 @@ pub(crate) async fn save_copy_in_self_talk(
|
||||
Ok(msg.rfc724_mid)
|
||||
}
|
||||
|
||||
/// Resends given messages with the same Message-ID.
|
||||
/// Resends given messages to members of the corresponding chats.
|
||||
///
|
||||
/// This is primarily intended to make existing webxdcs available to new chat members.
|
||||
pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
resend_msgs_ex(context, msg_ids, None).await
|
||||
}
|
||||
|
||||
/// Resends given messages to a contact with fingerprint `to_fingerprint` or, if it's `None`, to
|
||||
/// members of the corresponding chats.
|
||||
///
|
||||
/// NB: Actually `to_fingerprint` is only passed for `OutBroadcast` chats when a new member is
|
||||
/// added. Currently webxdc status updates are re-sent to all broadcast members instead of the
|
||||
/// requested contact, but this will be changed soon: webxdc status updates won't be re-sent at all
|
||||
/// as they may contain confidential data sent by subscribers to the owner. Of course this may also
|
||||
/// happen without resending subscribers' status updates if a webxdc app is "unsafe" for use in
|
||||
/// broadcasts on its own, but this is a separate problem.
|
||||
pub(crate) async fn resend_msgs_ex(
|
||||
context: &Context,
|
||||
msg_ids: &[MsgId],
|
||||
to_fingerprint: Option<Fingerprint>,
|
||||
) -> Result<()> {
|
||||
let to_fingerprint = to_fingerprint.map(|f| f.hex());
|
||||
let mut msgs: Vec<Message> = Vec::new();
|
||||
for msg_id in msg_ids {
|
||||
let msg = Message::load_from_db(context, *msg_id).await?;
|
||||
@@ -4697,11 +4776,25 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
| MessageState::OutFailed
|
||||
| MessageState::OutDelivered
|
||||
| MessageState::OutMdnRcvd => {
|
||||
message::update_msg_state(context, msg.id, MessageState::OutPending).await?
|
||||
// Broadcast owners shouldn't see spinners on messages being auto-re-sent to new
|
||||
// subscribers (otherwise big channel owners will see spinners most of the time).
|
||||
if to_fingerprint.is_none() {
|
||||
message::update_msg_state(context, msg.id, MessageState::OutPending).await?;
|
||||
}
|
||||
}
|
||||
msg_state => bail!("Unexpected message state {msg_state}"),
|
||||
}
|
||||
if create_send_msg_jobs(context, &mut msg).await?.is_empty() {
|
||||
let mut row_id = None;
|
||||
if let Some(to_fingerprint) = &to_fingerprint {
|
||||
msg.param.set(Param::Arg4, to_fingerprint.clone());
|
||||
// Avoid updating the `msgs` row.
|
||||
row_id = Some(msg.id);
|
||||
msg.id = MsgId::new(u32::MAX);
|
||||
}
|
||||
if create_send_msg_jobs(context, &mut msg, row_id)
|
||||
.await?
|
||||
.is_empty()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -4712,7 +4805,8 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
chat_id: msg.chat_id,
|
||||
msg_id: msg.id,
|
||||
});
|
||||
// note(treefit): only matters if it is the last message in chat (but probably to expensive to check, debounce also solves it)
|
||||
// The event only matters if the message is last in the chat.
|
||||
// But it's probably too expensive check, and UIs anyways need to debounce.
|
||||
chatlist_events::emit_chatlist_item_changed(context, msg.chat_id);
|
||||
|
||||
if msg.viewtype == Viewtype::Webxdc {
|
||||
|
||||
Reference in New Issue
Block a user