From fed3a6a3e7e02a88333ce3dd3a48408e9d2e4120 Mon Sep 17 00:00:00 2001 From: iequidoo Date: Fri, 8 May 2026 00:14:43 -0400 Subject: [PATCH] fix: Send pre-message after successful sending of post-message (#8063) Also, send the message text in the post-message too, we can't support the workaround for duplicated message text shown in ancient versions anymore, otherwise newer versions won't show the message text at all. On the receiving side, download post messages w/o known pre-message after 30s to protect from lost messages. This isn't easy to test in Rust though because the `smtp` sending code requires an SMTP connection, so it's only tested (by already existing tests) that messages are queued in the right order. Another problem is that the `smtp` sending logic doesn't try to send any messages following a failed-to-send message until the retry limit is reached, so if there's smth wrong with a post-message, other unrelated messages are delayed, but this problem has already existed. --- .../tests/test_iroh_webxdc.py | 5 -- deltachat-rpc-client/tests/test_something.py | 10 ---- src/chat.rs | 19 +++---- src/download.rs | 19 +++++-- src/imap.rs | 11 ++-- src/mimefactory.rs | 30 +++++------ src/receive_imf.rs | 51 +++++++++++-------- src/receive_imf/receive_imf_tests.rs | 26 +++++----- src/smtp.rs | 2 +- src/sql/migrations.rs | 13 +++++ src/test_utils.rs | 14 ++++- src/tests/pre_messages/forward_and_save.rs | 2 +- src/tests/pre_messages/receiving.rs | 6 +-- 13 files changed, 116 insertions(+), 92 deletions(-) diff --git a/deltachat-rpc-client/tests/test_iroh_webxdc.py b/deltachat-rpc-client/tests/test_iroh_webxdc.py index 53217c922..3be274d96 100644 --- a/deltachat-rpc-client/tests/test_iroh_webxdc.py +++ b/deltachat-rpc-client/tests/test_iroh_webxdc.py @@ -251,12 +251,7 @@ def test_realtime_large_webxdc(acfactory, path_to_large_webxdc): ac1_ac2_chat = ac1.create_chat(ac2) ac1_webxdc_msg = ac1_ac2_chat.send_message(text="realtime check", file=path_to_large_webxdc) - # Receive pre-message. ac2_webxdc_msg = ac2.wait_for_incoming_msg() - - # Receive post-message. - ac2_webxdc_msg = ac2.wait_for_msg(EventType.MSGS_CHANGED) - ac2_webxdc_msg.send_webxdc_realtime_advertisement() event = ac1.wait_for_event(EventType.WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED) assert event.msg_id == ac1_webxdc_msg.id diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index fe329b4bf..28dc14ab0 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -865,10 +865,6 @@ def test_delete_fully_downloaded_msg(acfactory, tmp_path, direct_imap): msg = bob.wait_for_incoming_msg() msg_snapshot = msg.get_snapshot() - assert msg_snapshot.download_state == DownloadState.AVAILABLE - msgs_changed_event = bob.wait_for_msgs_changed_event() - assert msgs_changed_event.msg_id == msg.id - msg_snapshot = msg.get_snapshot() assert msg_snapshot.download_state == DownloadState.DONE bob_direct_imap = direct_imap(bob) @@ -899,10 +895,6 @@ def test_imap_autodelete_fully_downloaded_msg(acfactory, tmp_path, direct_imap): msg = bob.wait_for_incoming_msg() msg_snapshot = msg.get_snapshot() - assert msg_snapshot.download_state == DownloadState.AVAILABLE - msgs_changed_event = bob.wait_for_msgs_changed_event() - assert msgs_changed_event.msg_id == msg.id - msg_snapshot = msg.get_snapshot() assert msg_snapshot.download_state == DownloadState.DONE bob_direct_imap = direct_imap(bob) @@ -1379,7 +1371,5 @@ def test_large_message(acfactory) -> None: ) msg = bob.wait_for_incoming_msg() - msgs_changed_event = bob.wait_for_msgs_changed_event() - assert msg.id == msgs_changed_event.msg_id snapshot = msg.get_snapshot() assert snapshot.text == "Hello World, this message is bigger than 5 bytes" diff --git a/src/chat.rs b/src/chat.rs index 518f9203f..580ed394f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -2970,15 +2970,6 @@ WHERE id=? )?; for recipients_chunk in recipients.chunks(chunk_size) { let recipients_chunk = recipients_chunk.join(" "); - if let Some(pre_msg) = &rendered_pre_msg { - let row_id = stmt.execute(( - &pre_msg.rfc724_mid, - &recipients_chunk, - &pre_msg.message, - msg.id, - ))?; - row_ids.push(row_id.try_into()?); - } let row_id = stmt.execute(( &rendered_msg.rfc724_mid, &recipients_chunk, @@ -2986,6 +2977,16 @@ WHERE id=? msg.id, ))?; row_ids.push(row_id.try_into()?); + let Some(pre_msg) = &rendered_pre_msg else { + continue; + }; + let row_id = stmt.execute(( + &pre_msg.rfc724_mid, + &recipients_chunk, + &pre_msg.message, + msg.id, + ))?; + row_ids.push(row_id.try_into()?); } Ok(row_ids) }; diff --git a/src/download.rs b/src/download.rs index b36f71921..7a2c84963 100644 --- a/src/download.rs +++ b/src/download.rs @@ -10,6 +10,7 @@ use crate::context::Context; use crate::imap::session::Session; use crate::log::warn; use crate::message::{self, Message, MsgId, rfc724_mid_exists}; +use crate::tools; use crate::{EventType, chatlist_events}; pub(crate) mod post_msg_metadata; @@ -320,12 +321,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message( context: &Context, session: &mut Session, ) -> Result<()> { + const PRE_MSG_WAIT_TIME: i64 = 30; + let now = tools::time(); let rfc724_mids = context .sql - .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| { - let rfc724_mid: String = row.get(0)?; - Ok(rfc724_mid) - }) + .query_map_vec( + " +SELECT rfc724_mid FROM available_post_msgs +WHERE timestamp<=? OR timestamp>? +ORDER BY timestamp, rowid + ", + (now.saturating_sub(PRE_MSG_WAIT_TIME), now), + |row| { + let rfc724_mid: String = row.get(0)?; + Ok(rfc724_mid) + }, + ) .await?; for rfc724_mid in &rfc724_mids { if msg_is_downloaded_for(context, rfc724_mid).await? { diff --git a/src/imap.rs b/src/imap.rs index 892253bc2..92d656b2f 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -601,7 +601,7 @@ impl Imap { let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await; let mut uids_fetch: Vec = Vec::new(); - let mut available_post_msgs: Vec = Vec::new(); + let mut available_post_msgs: Vec<(String, i64)> = Vec::new(); let mut download_later: Vec = Vec::new(); let mut uid_message_ids = BTreeMap::new(); let mut largest_uid_skipped = None; @@ -689,7 +689,7 @@ impl Imap { .is_some() { info!(context, "{message_id:?} is a post-message."); - available_post_msgs.push(message_id.clone()); + available_post_msgs.push((message_id.clone(), time())); let is_bot = context.get_config_bool(Config::Bot).await?; if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit) @@ -793,9 +793,10 @@ impl Imap { download_later.len(), ); let trans_fn = |t: &mut rusqlite::Transaction| { - let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?; - for rfc724_mid in available_post_msgs { - stmt.execute((rfc724_mid,)) + let mut stmt = + t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?; + for entry in available_post_msgs { + stmt.execute(entry) .context("INSERT OR IGNORE INTO available_post_msgs")?; } let mut stmt = diff --git a/src/mimefactory.rs b/src/mimefactory.rs index dfe64119a..d0671ca12 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -1723,23 +1723,19 @@ impl MimeFactory { let footer = if is_reaction { "" } else { &self.selfstatus }; - let message_text = if self.pre_message_mode == PreMessageMode::Post { - "".to_string() - } else { - format!( - "{}{}{}{}{}{}", - fwdhint.unwrap_or_default(), - quoted_text.unwrap_or_default(), - escape_message_footer_marks(final_text), - if !final_text.is_empty() && !footer.is_empty() { - "\r\n\r\n" - } else { - "" - }, - if !footer.is_empty() { "-- \r\n" } else { "" }, - footer - ) - }; + let message_text = format!( + "{}{}{}{}{}{}", + fwdhint.unwrap_or_default(), + quoted_text.unwrap_or_default(), + escape_message_footer_marks(final_text), + if !final_text.is_empty() && !footer.is_empty() { + "\r\n\r\n" + } else { + "" + }, + if !footer.is_empty() { "-- \r\n" } else { "" }, + footer + ); let mut main_part = MimePart::new("text/plain", message_text); if is_reaction { diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 6feebccf2..049ef123b 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner( "Receiving message {rfc724_mid_orig:?}, seen={seen}...", ); - // These checks must be done before processing of SecureJoin and other special messages. - if mime_parser.pre_message == mimeparser::PreMessageMode::Post { - // Post-Message just replaces the attachment and modifies Params, not the whole message. - // This is done in the `handle_post_message` method. - } else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? { - info!( - context, - "Message {rfc724_mid} is already in some chat or deleted." - ); - if mime_parser.incoming { - return Ok(None); - } + let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?; + if let Some(msg_id) = msg_id + && !mime_parser.incoming + { // For the case if we missed a successful SMTP response. Be optimistic that the message is // delivered also. let self_addr = context.get_primary_self_addr().await?; @@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner( if !msg_has_pending_smtp_job(context, msg_id).await? { msg_id.set_delivered(context).await?; } + } + // These checks must be done before processing of SecureJoin and other special messages. + if mime_parser.pre_message == mimeparser::PreMessageMode::Post { + // Post-Message just replaces the attachment and modifies Params, not the whole message. + // This is done in the `update_from_post_msg` method. + } else if msg_id.is_some() { + info!( + context, + "Message {rfc724_mid} is already in some chat or deleted." + ); return Ok(None); } @@ -2079,7 +2081,7 @@ async fn add_parts( } handle_edit_delete(context, mime_parser, from_id).await?; - handle_post_message(context, mime_parser, from_id, state).await?; + update_from_post_msg(context, mime_parser, from_id, state).await?; if mime_parser.is_system_message == SystemMessage::CallAccepted || mime_parser.is_system_message == SystemMessage::CallEnded @@ -2273,8 +2275,7 @@ INSERT INTO msgs // Maybe set logging xdc and add gossip topics for webxdcs. for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) { - if mime_parser.pre_message != PreMessageMode::Post - && part.typ == Viewtype::Webxdc + if part.typ == Viewtype::Webxdc && let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic) { let topic = iroh_topic_from_str(topic)?; @@ -2421,7 +2422,7 @@ async fn handle_edit_delete( Ok(()) } -async fn handle_post_message( +async fn update_from_post_msg( context: &Context, mime_parser: &MimeMessage, from_id: ContactId, @@ -2439,7 +2440,7 @@ async fn handle_post_message( let Some(msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? else { warn!( context, - "handle_post_message: {rfc724_mid}: Database entry does not exist." + "update_from_post_msg: {rfc724_mid}: Database entry does not exist." ); return Ok(()); }; @@ -2447,7 +2448,7 @@ async fn handle_post_message( // else: message is processed like a normal message warn!( context, - "handle_post_message: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message." + "update_from_post_msg: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message." ); return Ok(()); }; @@ -2458,7 +2459,7 @@ async fn handle_post_message( // Do nothing if safety checks fail, the worst case is the message modifies the chat if the // sender is a member. if from_id != original_msg.from_id { - warn!(context, "handle_post_message: {rfc724_mid}: Bad sender."); + warn!(context, "update_from_post_msg: {rfc724_mid}: Bad sender."); return Ok(()); } let post_msg_showpadlock = part @@ -2466,14 +2467,17 @@ async fn handle_post_message( .get_bool(Param::GuaranteeE2ee) .unwrap_or_default(); if !post_msg_showpadlock && original_msg.get_showpadlock() { - warn!(context, "handle_post_message: {rfc724_mid}: Not encrypted."); + warn!( + context, + "update_from_post_msg: {rfc724_mid}: Not encrypted." + ); return Ok(()); } if !part.typ.has_file() { warn!( context, - "handle_post_message: {rfc724_mid}: First mime part's message-viewtype has no file." + "update_from_post_msg: {rfc724_mid}: First mime part's message-viewtype has no file." ); return Ok(()); } @@ -2504,7 +2508,10 @@ WHERE id=? part.typ, part.bytes as isize, part.error.as_deref().unwrap_or_default(), - state, + match mime_parser.incoming { + true => state, + false => MessageState::Undefined, + }, DownloadState::Done as u32, original_msg.id, ), diff --git a/src/receive_imf/receive_imf_tests.rs b/src/receive_imf/receive_imf_tests.rs index f36295717..d41a2c58f 100644 --- a/src/receive_imf/receive_imf_tests.rs +++ b/src/receive_imf/receive_imf_tests.rs @@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<() .await .unwrap(); - let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await; - assert_eq!(msg_id, pre_msg_id); - assert!(pre_msg_payload.len() < file_bytes.len()); - - assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); - // Alice receives her own pre-message because of bcc_self - // This should not yet mark the message as delivered, - // because not everything was sent, - // but it does remove the pre-message from the SMTP queue - receive_imf(alice, pre_msg_payload.as_bytes(), false).await?; - assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); - let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await; assert_eq!(msg_id, post_msg_id); assert!(post_msg_payload.len() > file_bytes.len()); assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); // Alice receives her own post-message because of bcc_self + // This should not yet mark the message as delivered, + // because not everything was sent, + // but it does remove the post-message from the SMTP queue. + receive_imf(alice, post_msg_payload.as_bytes(), false).await?; + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); + + let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await; + assert_eq!(msg_id, pre_msg_id); + assert!(pre_msg_payload.len() < file_bytes.len()); + + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); + // Alice receives her own pre-message because of bcc_self // This should now mark the message as delivered, // because everything was sent by now. - receive_imf(alice, post_msg_payload.as_bytes(), false).await?; + receive_imf(alice, pre_msg_payload.as_bytes(), false).await?; assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered); Ok(()) diff --git a/src/smtp.rs b/src/smtp.rs index 020c575fd..ac6344aef 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp( if retries > 6 { context .sql - .execute("DELETE FROM smtp WHERE id=?", (rowid,)) + .execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,)) .await .context("Failed to remove message with exceeded retry limit from smtp table")?; if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? { diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index ce5cf718c..0e8f5bd8c 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -2385,6 +2385,19 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed. .await?; } + inc_and_check(&mut migration_version, 153)?; + if dbversion < migration_version { + sql.execute_migration( + " +CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id); +ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL; +CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp); + ", + migration_version, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? diff --git a/src/test_utils.rs b/src/test_utils.rs index 94bdce166..8c3e24cf5 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -722,12 +722,14 @@ ORDER BY id" }) } + /// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned + /// items go in reverse order for historical reasons. pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec> { let sent_msgs = self .ctx .sql .query_map_vec( - "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?", + "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC", (msg_id,), |row| { let _id: MsgId = row.get(0)?; @@ -1055,9 +1057,17 @@ ORDER BY id" /// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call /// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive /// the message. + /// + /// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as + /// tracking of these jobs is probably already lost by the test code. pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> { + while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {} let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap(); - let res = self.pop_sent_msg().await; + let rev_order = false; + let res = self + .pop_sent_msg_ex(rev_order, Duration::ZERO) + .await + .unwrap(); assert_eq!( res.sender_msg_id, msg_id, "Apparently the message was not actually sent out" diff --git a/src/tests/pre_messages/forward_and_save.rs b/src/tests/pre_messages/forward_and_save.rs index 1961d5667..d534dc200 100644 --- a/src/tests/pre_messages/forward_and_save.rs +++ b/src/tests/pre_messages/forward_and_save.rs @@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> { assert_eq!(msg.text, "test".to_owned()); forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?; - let rev_order = false; + let rev_order = true; let msg = bob .recv_msg( &alice diff --git a/src/tests/pre_messages/receiving.rs b/src/tests/pre_messages/receiving.rs index e92f6d21d..6504a2e52 100644 --- a/src/tests/pre_messages/receiving.rs +++ b/src/tests/pre_messages/receiving.rs @@ -257,7 +257,7 @@ async fn test_lost_pre_msg() -> Result<()> { let _pre_msg = alice.pop_sent_msg().await; let msg = bob.recv_msg(&full_msg).await; assert_eq!(msg.download_state, DownloadState::Done); - assert_eq!(msg.text, ""); + assert_eq!(msg.text, "populate"); Ok(()) } @@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> { .await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?; - let post_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await; + let post_message = alice.pop_sent_msg().await; let bob_instance = bob.recv_msg(&pre_message).await; assert_eq!(bob_instance.download_state, DownloadState::Available); @@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()> .await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?; - let post_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await; + let post_message = alice.pop_sent_msg().await; // Bob receives post-message first. let bob_instance = bob.recv_msg(&post_message).await;