fix: create_send_msg_jobs: Avoid races with resending to new broadcast members

Follow-up to 970222f. We shouldn't update the message db entry and also share the message id with
new SMTP jobs, otherwise the behavior would depend on timings which isn't good.
This commit is contained in:
iequidoo
2026-04-22 15:25:11 -03:00
parent 38affa2c62
commit 0571c556fe

View File

@@ -2753,8 +2753,8 @@ async fn prepare_send_msg(
chat_id.unarchive_if_not_muted(context, msg.state).await?; chat_id.unarchive_if_not_muted(context, msg.state).await?;
} }
chat.prepare_msg_raw(context, msg, update_msg_id).await?; chat.prepare_msg_raw(context, msg, update_msg_id).await?;
let resend_to_new = false;
let row_ids = create_send_msg_jobs(context, msg) let row_ids = create_send_msg_jobs(context, msg, resend_to_new)
.await .await
.context("Failed to create send jobs")?; .context("Failed to create send jobs")?;
if !row_ids.is_empty() { if !row_ids.is_empty() {
@@ -2824,7 +2824,11 @@ async fn render_mime_message_and_pre_message(
/// Returns row ids if `smtp` table jobs were created or an empty `Vec` otherwise. /// Returns row ids if `smtp` table jobs were created or an empty `Vec` otherwise.
/// ///
/// The caller has to interrupt SMTP loop or otherwise process new rows. /// The caller has to interrupt SMTP loop or otherwise process new rows.
pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result<Vec<i64>> { async fn create_send_msg_jobs(
context: &Context,
msg: &mut Message,
resend_to_new: bool,
) -> Result<Vec<i64>> {
let cmd = msg.param.get_cmd(); let cmd = msg.param.get_cmd();
if cmd == SystemMessage::GroupNameChanged || cmd == SystemMessage::GroupDescriptionChanged { if cmd == SystemMessage::GroupNameChanged || cmd == SystemMessage::GroupDescriptionChanged {
msg.chat_id msg.chat_id
@@ -2940,35 +2944,41 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
msg.param.remove(Param::GuaranteeE2ee); msg.param.remove(Param::GuaranteeE2ee);
} }
msg.subject.clone_from(&rendered_msg.subject); msg.subject.clone_from(&rendered_msg.subject);
// Sort the message to the bottom. Employ `msgs_index7` to compute `timestamp`. if !resend_to_new {
context // Sort the message to the bottom. Employ `msgs_index7` to compute `timestamp`.
.sql context
.execute( .sql
" .execute(
UPDATE msgs SET "
timestamp=( UPDATE msgs SET
SELECT MAX(timestamp) FROM msgs INDEXED BY msgs_index7 WHERE timestamp=(
-- From `InFresh` to `OutMdnRcvd` inclusive except `OutDraft`. SELECT MAX(timestamp) FROM msgs INDEXED BY msgs_index7 WHERE
state IN(10,13,16,18,20,24,26,28) AND -- From `InFresh` to `OutMdnRcvd` inclusive except `OutDraft`.
hidden IN(0,1) AND state IN(10,13,16,18,20,24,26,28) AND
chat_id=? AND hidden IN(0,1) AND
id<=? chat_id=? AND
), id<=?
pre_rfc724_mid=?, subject=?, param=? ),
WHERE id=? pre_rfc724_mid=?, subject=?, param=?
", WHERE id=?
( ",
msg.chat_id, (
msg.id, msg.chat_id,
&msg.pre_rfc724_mid, msg.id,
&msg.subject, &msg.pre_rfc724_mid,
msg.param.to_string(), &msg.subject,
msg.id, msg.param.to_string(),
), msg.id,
) ),
.await?; )
.await?;
}
let chunk_size = context.get_max_smtp_rcpt_to().await?; let chunk_size = context.get_max_smtp_rcpt_to().await?;
let msg_id = match resend_to_new {
true => MsgId::new(u32::MAX),
false => msg.id,
};
let trans_fn = |t: &mut rusqlite::Transaction| { let trans_fn = |t: &mut rusqlite::Transaction| {
let mut row_ids = Vec::<i64>::new(); let mut row_ids = Vec::<i64>::new();
@@ -2989,7 +2999,7 @@ WHERE id=?
&pre_msg.rfc724_mid, &pre_msg.rfc724_mid,
&recipients_chunk, &recipients_chunk,
&pre_msg.message, &pre_msg.message,
msg.id, msg_id,
))?; ))?;
row_ids.push(row_id.try_into()?); row_ids.push(row_id.try_into()?);
} }
@@ -2997,7 +3007,7 @@ WHERE id=?
&rendered_msg.rfc724_mid, &rendered_msg.rfc724_mid,
&recipients_chunk, &recipients_chunk,
&rendered_msg.message, &rendered_msg.message,
msg.id, msg_id,
))?; ))?;
row_ids.push(row_id.try_into()?); row_ids.push(row_id.try_into()?);
} }
@@ -4609,7 +4619,11 @@ pub async fn forward_msgs_2ctx(
chat.prepare_msg_raw(ctx_dst, &mut msg, None).await?; chat.prepare_msg_raw(ctx_dst, &mut msg, None).await?;
curr_timestamp += 1; curr_timestamp += 1;
if !create_send_msg_jobs(ctx_dst, &mut msg).await?.is_empty() { let resend_to_new = false;
if !create_send_msg_jobs(ctx_dst, &mut msg, resend_to_new)
.await?
.is_empty()
{
ctx_dst.scheduler.interrupt_smtp().await; ctx_dst.scheduler.interrupt_smtp().await;
} }
created_msgs.push(msg.id); created_msgs.push(msg.id);
@@ -4767,7 +4781,11 @@ pub(crate) async fn resend_msgs_ex(
if let Some(to_fingerprint) = &to_fingerprint { if let Some(to_fingerprint) = &to_fingerprint {
msg.param.set(Param::Arg4, to_fingerprint.clone()); msg.param.set(Param::Arg4, to_fingerprint.clone());
} }
if create_send_msg_jobs(context, &mut msg).await?.is_empty() { let resend_to_new = to_fingerprint.is_some();
if create_send_msg_jobs(context, &mut msg, resend_to_new)
.await?
.is_empty()
{
continue; continue;
} }