From a18bf746202f587b81c77a2736eccbffa8e567c3 Mon Sep 17 00:00:00 2001 From: iequidoo Date: Fri, 8 May 2026 00:14:43 -0400 Subject: [PATCH] fix: Send pre-message after successful sending of post-message (#8063) This isn't easy to test though because the `smtp` sending code requires an SMTP connection, so it's only tested (by already existing tests) that messages are queued in the right order. Another problem is that the `smtp` sending logic doesn't try to send any messages following a failed-to-send message until the retry limit is reached, so if there's smth wrong with a post-message, other unrelated messages are delayed, but this problem has already existed. --- src/chat.rs | 19 ++++++------- src/receive_imf.rs | 31 +++++++++++++--------- src/receive_imf/receive_imf_tests.rs | 26 +++++++++--------- src/smtp.rs | 2 +- src/sql/migrations.rs | 9 +++++++ src/test_utils.rs | 14 ++++++++-- src/tests/pre_messages/forward_and_save.rs | 2 +- src/tests/pre_messages/receiving.rs | 4 +-- 8 files changed, 66 insertions(+), 41 deletions(-) diff --git a/src/chat.rs b/src/chat.rs index 518f9203f..580ed394f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -2970,15 +2970,6 @@ WHERE id=? )?; for recipients_chunk in recipients.chunks(chunk_size) { let recipients_chunk = recipients_chunk.join(" "); - if let Some(pre_msg) = &rendered_pre_msg { - let row_id = stmt.execute(( - &pre_msg.rfc724_mid, - &recipients_chunk, - &pre_msg.message, - msg.id, - ))?; - row_ids.push(row_id.try_into()?); - } let row_id = stmt.execute(( &rendered_msg.rfc724_mid, &recipients_chunk, @@ -2986,6 +2977,16 @@ WHERE id=? msg.id, ))?; row_ids.push(row_id.try_into()?); + let Some(pre_msg) = &rendered_pre_msg else { + continue; + }; + let row_id = stmt.execute(( + &pre_msg.rfc724_mid, + &recipients_chunk, + &pre_msg.message, + msg.id, + ))?; + row_ids.push(row_id.try_into()?); } Ok(row_ids) }; diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 6feebccf2..fd3be0912 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner( "Receiving message {rfc724_mid_orig:?}, seen={seen}...", ); - // These checks must be done before processing of SecureJoin and other special messages. - if mime_parser.pre_message == mimeparser::PreMessageMode::Post { - // Post-Message just replaces the attachment and modifies Params, not the whole message. - // This is done in the `handle_post_message` method. - } else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? { - info!( - context, - "Message {rfc724_mid} is already in some chat or deleted." - ); - if mime_parser.incoming { - return Ok(None); - } + let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?; + if let Some(msg_id) = msg_id + && !mime_parser.incoming + { // For the case if we missed a successful SMTP response. Be optimistic that the message is // delivered also. let self_addr = context.get_primary_self_addr().await?; @@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner( if !msg_has_pending_smtp_job(context, msg_id).await? { msg_id.set_delivered(context).await?; } + } + // These checks must be done before processing of SecureJoin and other special messages. + if mime_parser.pre_message == mimeparser::PreMessageMode::Post { + // Post-Message just replaces the attachment and modifies Params, not the whole message. + // This is done in the `handle_post_message` method. + } else if msg_id.is_some() { + info!( + context, + "Message {rfc724_mid} is already in some chat or deleted." + ); return Ok(None); } @@ -2504,7 +2506,10 @@ WHERE id=? part.typ, part.bytes as isize, part.error.as_deref().unwrap_or_default(), - state, + match mime_parser.incoming { + true => state, + false => MessageState::Undefined, + }, DownloadState::Done as u32, original_msg.id, ), diff --git a/src/receive_imf/receive_imf_tests.rs b/src/receive_imf/receive_imf_tests.rs index f36295717..d41a2c58f 100644 --- a/src/receive_imf/receive_imf_tests.rs +++ b/src/receive_imf/receive_imf_tests.rs @@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<() .await .unwrap(); - let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await; - assert_eq!(msg_id, pre_msg_id); - assert!(pre_msg_payload.len() < file_bytes.len()); - - assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); - // Alice receives her own pre-message because of bcc_self - // This should not yet mark the message as delivered, - // because not everything was sent, - // but it does remove the pre-message from the SMTP queue - receive_imf(alice, pre_msg_payload.as_bytes(), false).await?; - assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); - let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await; assert_eq!(msg_id, post_msg_id); assert!(post_msg_payload.len() > file_bytes.len()); assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); // Alice receives her own post-message because of bcc_self + // This should not yet mark the message as delivered, + // because not everything was sent, + // but it does remove the post-message from the SMTP queue. + receive_imf(alice, post_msg_payload.as_bytes(), false).await?; + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); + + let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await; + assert_eq!(msg_id, pre_msg_id); + assert!(pre_msg_payload.len() < file_bytes.len()); + + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); + // Alice receives her own pre-message because of bcc_self // This should now mark the message as delivered, // because everything was sent by now. - receive_imf(alice, post_msg_payload.as_bytes(), false).await?; + receive_imf(alice, pre_msg_payload.as_bytes(), false).await?; assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered); Ok(()) diff --git a/src/smtp.rs b/src/smtp.rs index 020c575fd..ac6344aef 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp( if retries > 6 { context .sql - .execute("DELETE FROM smtp WHERE id=?", (rowid,)) + .execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,)) .await .context("Failed to remove message with exceeded retry limit from smtp table")?; if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? { diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index ce5cf718c..429c0cf87 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -2385,6 +2385,15 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed. .await?; } + inc_and_check(&mut migration_version, 153)?; + if dbversion < migration_version { + sql.execute_migration( + "CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id)", + migration_version, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? diff --git a/src/test_utils.rs b/src/test_utils.rs index 94bdce166..8c3e24cf5 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -722,12 +722,14 @@ ORDER BY id" }) } + /// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned + /// items go in reverse order for historical reasons. pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec> { let sent_msgs = self .ctx .sql .query_map_vec( - "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?", + "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC", (msg_id,), |row| { let _id: MsgId = row.get(0)?; @@ -1055,9 +1057,17 @@ ORDER BY id" /// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call /// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive /// the message. + /// + /// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as + /// tracking of these jobs is probably already lost by the test code. pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> { + while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {} let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap(); - let res = self.pop_sent_msg().await; + let rev_order = false; + let res = self + .pop_sent_msg_ex(rev_order, Duration::ZERO) + .await + .unwrap(); assert_eq!( res.sender_msg_id, msg_id, "Apparently the message was not actually sent out" diff --git a/src/tests/pre_messages/forward_and_save.rs b/src/tests/pre_messages/forward_and_save.rs index 1961d5667..d534dc200 100644 --- a/src/tests/pre_messages/forward_and_save.rs +++ b/src/tests/pre_messages/forward_and_save.rs @@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> { assert_eq!(msg.text, "test".to_owned()); forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?; - let rev_order = false; + let rev_order = true; let msg = bob .recv_msg( &alice diff --git a/src/tests/pre_messages/receiving.rs b/src/tests/pre_messages/receiving.rs index e92f6d21d..70f157178 100644 --- a/src/tests/pre_messages/receiving.rs +++ b/src/tests/pre_messages/receiving.rs @@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> { .await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?; - let post_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await; + let post_message = alice.pop_sent_msg().await; let bob_instance = bob.recv_msg(&pre_message).await; assert_eq!(bob_instance.download_state, DownloadState::Available); @@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()> .await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?; - let post_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await; + let post_message = alice.pop_sent_msg().await; // Bob receives post-message first. let bob_instance = bob.recv_msg(&post_message).await;