diff --git a/deltachat-rpc-client/tests/test_iroh_webxdc.py b/deltachat-rpc-client/tests/test_iroh_webxdc.py index 53217c922..3be274d96 100644 --- a/deltachat-rpc-client/tests/test_iroh_webxdc.py +++ b/deltachat-rpc-client/tests/test_iroh_webxdc.py @@ -251,12 +251,7 @@ def test_realtime_large_webxdc(acfactory, path_to_large_webxdc): ac1_ac2_chat = ac1.create_chat(ac2) ac1_webxdc_msg = ac1_ac2_chat.send_message(text="realtime check", file=path_to_large_webxdc) - # Receive pre-message. ac2_webxdc_msg = ac2.wait_for_incoming_msg() - - # Receive post-message. - ac2_webxdc_msg = ac2.wait_for_msg(EventType.MSGS_CHANGED) - ac2_webxdc_msg.send_webxdc_realtime_advertisement() event = ac1.wait_for_event(EventType.WEBXDC_REALTIME_ADVERTISEMENT_RECEIVED) assert event.msg_id == ac1_webxdc_msg.id diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index fe329b4bf..28dc14ab0 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -865,10 +865,6 @@ def test_delete_fully_downloaded_msg(acfactory, tmp_path, direct_imap): msg = bob.wait_for_incoming_msg() msg_snapshot = msg.get_snapshot() - assert msg_snapshot.download_state == DownloadState.AVAILABLE - msgs_changed_event = bob.wait_for_msgs_changed_event() - assert msgs_changed_event.msg_id == msg.id - msg_snapshot = msg.get_snapshot() assert msg_snapshot.download_state == DownloadState.DONE bob_direct_imap = direct_imap(bob) @@ -899,10 +895,6 @@ def test_imap_autodelete_fully_downloaded_msg(acfactory, tmp_path, direct_imap): msg = bob.wait_for_incoming_msg() msg_snapshot = msg.get_snapshot() - assert msg_snapshot.download_state == DownloadState.AVAILABLE - msgs_changed_event = bob.wait_for_msgs_changed_event() - assert msgs_changed_event.msg_id == msg.id - msg_snapshot = msg.get_snapshot() assert msg_snapshot.download_state == DownloadState.DONE bob_direct_imap = direct_imap(bob) @@ -1379,7 +1371,5 @@ def test_large_message(acfactory) -> None: ) msg = bob.wait_for_incoming_msg() - msgs_changed_event = bob.wait_for_msgs_changed_event() - assert msg.id == msgs_changed_event.msg_id snapshot = msg.get_snapshot() assert snapshot.text == "Hello World, this message is bigger than 5 bytes" diff --git a/src/chat.rs b/src/chat.rs index 518f9203f..580ed394f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -2970,15 +2970,6 @@ WHERE id=? )?; for recipients_chunk in recipients.chunks(chunk_size) { let recipients_chunk = recipients_chunk.join(" "); - if let Some(pre_msg) = &rendered_pre_msg { - let row_id = stmt.execute(( - &pre_msg.rfc724_mid, - &recipients_chunk, - &pre_msg.message, - msg.id, - ))?; - row_ids.push(row_id.try_into()?); - } let row_id = stmt.execute(( &rendered_msg.rfc724_mid, &recipients_chunk, @@ -2986,6 +2977,16 @@ WHERE id=? msg.id, ))?; row_ids.push(row_id.try_into()?); + let Some(pre_msg) = &rendered_pre_msg else { + continue; + }; + let row_id = stmt.execute(( + &pre_msg.rfc724_mid, + &recipients_chunk, + &pre_msg.message, + msg.id, + ))?; + row_ids.push(row_id.try_into()?); } Ok(row_ids) }; diff --git a/src/download.rs b/src/download.rs index b36f71921..7a2c84963 100644 --- a/src/download.rs +++ b/src/download.rs @@ -10,6 +10,7 @@ use crate::context::Context; use crate::imap::session::Session; use crate::log::warn; use crate::message::{self, Message, MsgId, rfc724_mid_exists}; +use crate::tools; use crate::{EventType, chatlist_events}; pub(crate) mod post_msg_metadata; @@ -320,12 +321,22 @@ pub(crate) async fn download_known_post_messages_without_pre_message( context: &Context, session: &mut Session, ) -> Result<()> { + const PRE_MSG_WAIT_TIME: i64 = 30; + let now = tools::time(); let rfc724_mids = context .sql - .query_map_vec("SELECT rfc724_mid FROM available_post_msgs", (), |row| { - let rfc724_mid: String = row.get(0)?; - Ok(rfc724_mid) - }) + .query_map_vec( + " +SELECT rfc724_mid FROM available_post_msgs +WHERE timestamp<=? OR timestamp>? +ORDER BY timestamp, rowid + ", + (now.saturating_sub(PRE_MSG_WAIT_TIME), now), + |row| { + let rfc724_mid: String = row.get(0)?; + Ok(rfc724_mid) + }, + ) .await?; for rfc724_mid in &rfc724_mids { if msg_is_downloaded_for(context, rfc724_mid).await? { diff --git a/src/imap.rs b/src/imap.rs index 892253bc2..92d656b2f 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -601,7 +601,7 @@ impl Imap { let _fetch_msgs_lock_guard = context.fetch_msgs_mutex.lock().await; let mut uids_fetch: Vec = Vec::new(); - let mut available_post_msgs: Vec = Vec::new(); + let mut available_post_msgs: Vec<(String, i64)> = Vec::new(); let mut download_later: Vec = Vec::new(); let mut uid_message_ids = BTreeMap::new(); let mut largest_uid_skipped = None; @@ -689,7 +689,7 @@ impl Imap { .is_some() { info!(context, "{message_id:?} is a post-message."); - available_post_msgs.push(message_id.clone()); + available_post_msgs.push((message_id.clone(), time())); let is_bot = context.get_config_bool(Config::Bot).await?; if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit) @@ -793,9 +793,10 @@ impl Imap { download_later.len(), ); let trans_fn = |t: &mut rusqlite::Transaction| { - let mut stmt = t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?)")?; - for rfc724_mid in available_post_msgs { - stmt.execute((rfc724_mid,)) + let mut stmt = + t.prepare("INSERT OR IGNORE INTO available_post_msgs VALUES (?,?)")?; + for entry in available_post_msgs { + stmt.execute(entry) .context("INSERT OR IGNORE INTO available_post_msgs")?; } let mut stmt = diff --git a/src/mimefactory.rs b/src/mimefactory.rs index dfe64119a..d0671ca12 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -1723,23 +1723,19 @@ impl MimeFactory { let footer = if is_reaction { "" } else { &self.selfstatus }; - let message_text = if self.pre_message_mode == PreMessageMode::Post { - "".to_string() - } else { - format!( - "{}{}{}{}{}{}", - fwdhint.unwrap_or_default(), - quoted_text.unwrap_or_default(), - escape_message_footer_marks(final_text), - if !final_text.is_empty() && !footer.is_empty() { - "\r\n\r\n" - } else { - "" - }, - if !footer.is_empty() { "-- \r\n" } else { "" }, - footer - ) - }; + let message_text = format!( + "{}{}{}{}{}{}", + fwdhint.unwrap_or_default(), + quoted_text.unwrap_or_default(), + escape_message_footer_marks(final_text), + if !final_text.is_empty() && !footer.is_empty() { + "\r\n\r\n" + } else { + "" + }, + if !footer.is_empty() { "-- \r\n" } else { "" }, + footer + ); let mut main_part = MimePart::new("text/plain", message_text); if is_reaction { diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 6feebccf2..049ef123b 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -525,18 +525,10 @@ pub(crate) async fn receive_imf_inner( "Receiving message {rfc724_mid_orig:?}, seen={seen}...", ); - // These checks must be done before processing of SecureJoin and other special messages. - if mime_parser.pre_message == mimeparser::PreMessageMode::Post { - // Post-Message just replaces the attachment and modifies Params, not the whole message. - // This is done in the `handle_post_message` method. - } else if let Some(msg_id) = message::rfc724_mid_exists(context, rfc724_mid_orig).await? { - info!( - context, - "Message {rfc724_mid} is already in some chat or deleted." - ); - if mime_parser.incoming { - return Ok(None); - } + let msg_id = message::rfc724_mid_exists(context, rfc724_mid_orig).await?; + if let Some(msg_id) = msg_id + && !mime_parser.incoming + { // For the case if we missed a successful SMTP response. Be optimistic that the message is // delivered also. let self_addr = context.get_primary_self_addr().await?; @@ -551,6 +543,16 @@ pub(crate) async fn receive_imf_inner( if !msg_has_pending_smtp_job(context, msg_id).await? { msg_id.set_delivered(context).await?; } + } + // These checks must be done before processing of SecureJoin and other special messages. + if mime_parser.pre_message == mimeparser::PreMessageMode::Post { + // Post-Message just replaces the attachment and modifies Params, not the whole message. + // This is done in the `update_from_post_msg` method. + } else if msg_id.is_some() { + info!( + context, + "Message {rfc724_mid} is already in some chat or deleted." + ); return Ok(None); } @@ -2079,7 +2081,7 @@ async fn add_parts( } handle_edit_delete(context, mime_parser, from_id).await?; - handle_post_message(context, mime_parser, from_id, state).await?; + update_from_post_msg(context, mime_parser, from_id, state).await?; if mime_parser.is_system_message == SystemMessage::CallAccepted || mime_parser.is_system_message == SystemMessage::CallEnded @@ -2273,8 +2275,7 @@ INSERT INTO msgs // Maybe set logging xdc and add gossip topics for webxdcs. for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) { - if mime_parser.pre_message != PreMessageMode::Post - && part.typ == Viewtype::Webxdc + if part.typ == Viewtype::Webxdc && let Some(topic) = mime_parser.get_header(HeaderDef::IrohGossipTopic) { let topic = iroh_topic_from_str(topic)?; @@ -2421,7 +2422,7 @@ async fn handle_edit_delete( Ok(()) } -async fn handle_post_message( +async fn update_from_post_msg( context: &Context, mime_parser: &MimeMessage, from_id: ContactId, @@ -2439,7 +2440,7 @@ async fn handle_post_message( let Some(msg_id) = message::rfc724_mid_exists(context, &rfc724_mid).await? else { warn!( context, - "handle_post_message: {rfc724_mid}: Database entry does not exist." + "update_from_post_msg: {rfc724_mid}: Database entry does not exist." ); return Ok(()); }; @@ -2447,7 +2448,7 @@ async fn handle_post_message( // else: message is processed like a normal message warn!( context, - "handle_post_message: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message." + "update_from_post_msg: {rfc724_mid}: Pre-message was not downloaded yet so treat as normal message." ); return Ok(()); }; @@ -2458,7 +2459,7 @@ async fn handle_post_message( // Do nothing if safety checks fail, the worst case is the message modifies the chat if the // sender is a member. if from_id != original_msg.from_id { - warn!(context, "handle_post_message: {rfc724_mid}: Bad sender."); + warn!(context, "update_from_post_msg: {rfc724_mid}: Bad sender."); return Ok(()); } let post_msg_showpadlock = part @@ -2466,14 +2467,17 @@ async fn handle_post_message( .get_bool(Param::GuaranteeE2ee) .unwrap_or_default(); if !post_msg_showpadlock && original_msg.get_showpadlock() { - warn!(context, "handle_post_message: {rfc724_mid}: Not encrypted."); + warn!( + context, + "update_from_post_msg: {rfc724_mid}: Not encrypted." + ); return Ok(()); } if !part.typ.has_file() { warn!( context, - "handle_post_message: {rfc724_mid}: First mime part's message-viewtype has no file." + "update_from_post_msg: {rfc724_mid}: First mime part's message-viewtype has no file." ); return Ok(()); } @@ -2504,7 +2508,10 @@ WHERE id=? part.typ, part.bytes as isize, part.error.as_deref().unwrap_or_default(), - state, + match mime_parser.incoming { + true => state, + false => MessageState::Undefined, + }, DownloadState::Done as u32, original_msg.id, ), diff --git a/src/receive_imf/receive_imf_tests.rs b/src/receive_imf/receive_imf_tests.rs index f36295717..d41a2c58f 100644 --- a/src/receive_imf/receive_imf_tests.rs +++ b/src/receive_imf/receive_imf_tests.rs @@ -5592,27 +5592,27 @@ async fn test_mark_message_as_delivered_only_after_sent_out_fully() -> Result<() .await .unwrap(); - let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await; - assert_eq!(msg_id, pre_msg_id); - assert!(pre_msg_payload.len() < file_bytes.len()); - - assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); - // Alice receives her own pre-message because of bcc_self - // This should not yet mark the message as delivered, - // because not everything was sent, - // but it does remove the pre-message from the SMTP queue - receive_imf(alice, pre_msg_payload.as_bytes(), false).await?; - assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); - let (post_msg_id, post_msg_payload) = first_row_in_smtp_queue(alice).await; assert_eq!(msg_id, post_msg_id); assert!(post_msg_payload.len() > file_bytes.len()); assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); // Alice receives her own post-message because of bcc_self + // This should not yet mark the message as delivered, + // because not everything was sent, + // but it does remove the post-message from the SMTP queue. + receive_imf(alice, post_msg_payload.as_bytes(), false).await?; + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); + + let (pre_msg_id, pre_msg_payload) = first_row_in_smtp_queue(alice).await; + assert_eq!(msg_id, pre_msg_id); + assert!(pre_msg_payload.len() < file_bytes.len()); + + assert_eq!(msg_id.get_state(alice).await?, MessageState::OutPending); + // Alice receives her own pre-message because of bcc_self // This should now mark the message as delivered, // because everything was sent by now. - receive_imf(alice, post_msg_payload.as_bytes(), false).await?; + receive_imf(alice, pre_msg_payload.as_bytes(), false).await?; assert_eq!(msg_id.get_state(alice).await?, MessageState::OutDelivered); Ok(()) diff --git a/src/smtp.rs b/src/smtp.rs index 020c575fd..ac6344aef 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -379,7 +379,7 @@ pub(crate) async fn send_msg_to_smtp( if retries > 6 { context .sql - .execute("DELETE FROM smtp WHERE id=?", (rowid,)) + .execute("DELETE FROM smtp WHERE msg_id=?", (msg_id,)) .await .context("Failed to remove message with exceeded retry limit from smtp table")?; if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? { diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index ce5cf718c..0e8f5bd8c 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -2385,6 +2385,19 @@ UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed. .await?; } + inc_and_check(&mut migration_version, 153)?; + if dbversion < migration_version { + sql.execute_migration( + " +CREATE INDEX smtp_index_msg_id ON smtp (msg_id, id); +ALTER TABLE available_post_msgs ADD COLUMN timestamp INTEGER DEFAULT 0 NOT NULL; +CREATE INDEX available_post_msgs_timestamp ON available_post_msgs (timestamp); + ", + migration_version, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? diff --git a/src/test_utils.rs b/src/test_utils.rs index 94bdce166..8c3e24cf5 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -722,12 +722,14 @@ ORDER BY id" }) } + /// Returns `SentMessage` instances representing `smtp` rows for the given message. Returned + /// items go in reverse order for historical reasons. pub async fn get_smtp_rows_for_msg<'a>(&'a self, msg_id: MsgId) -> Vec> { let sent_msgs = self .ctx .sql .query_map_vec( - "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=?", + "SELECT id, msg_id, mime, recipients FROM smtp WHERE msg_id=? ORDER BY id DESC", (msg_id,), |row| { let _id: MsgId = row.get(0)?; @@ -1055,9 +1057,17 @@ ORDER BY id" /// This is not hooked up to any SMTP-IMAP pipeline, so the other account must call /// [`TestContext::recv_msg`] with the returned [`SentMessage`] if it wants to receive /// the message. + /// + /// Removes SMTP jobs existed before and marks the corresponding messages as delivered, as + /// tracking of these jobs is probably already lost by the test code. pub async fn send_msg(&self, chat_id: ChatId, msg: &mut Message) -> SentMessage<'_> { + while self.pop_sent_msg_opt(Duration::ZERO).await.is_some() {} let msg_id = chat::send_msg(self, chat_id, msg).await.unwrap(); - let res = self.pop_sent_msg().await; + let rev_order = false; + let res = self + .pop_sent_msg_ex(rev_order, Duration::ZERO) + .await + .unwrap(); assert_eq!( res.sender_msg_id, msg_id, "Apparently the message was not actually sent out" diff --git a/src/tests/pre_messages/forward_and_save.rs b/src/tests/pre_messages/forward_and_save.rs index 1961d5667..d534dc200 100644 --- a/src/tests/pre_messages/forward_and_save.rs +++ b/src/tests/pre_messages/forward_and_save.rs @@ -106,7 +106,7 @@ async fn test_receive_both() -> Result<()> { assert_eq!(msg.text, "test".to_owned()); forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?; - let rev_order = false; + let rev_order = true; let msg = bob .recv_msg( &alice diff --git a/src/tests/pre_messages/receiving.rs b/src/tests/pre_messages/receiving.rs index e92f6d21d..6504a2e52 100644 --- a/src/tests/pre_messages/receiving.rs +++ b/src/tests/pre_messages/receiving.rs @@ -257,7 +257,7 @@ async fn test_lost_pre_msg() -> Result<()> { let _pre_msg = alice.pop_sent_msg().await; let msg = bob.recv_msg(&full_msg).await; assert_eq!(msg.download_state, DownloadState::Done); - assert_eq!(msg.text, ""); + assert_eq!(msg.text, "populate"); Ok(()) } @@ -547,8 +547,8 @@ async fn test_webxdc_updates_in_post_message_after_pre_message() -> Result<()> { .await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?; - let post_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await; + let post_message = alice.pop_sent_msg().await; let bob_instance = bob.recv_msg(&pre_message).await; assert_eq!(bob_instance.download_state, DownloadState::Available); @@ -588,8 +588,8 @@ async fn test_webxdc_updates_in_post_message_without_pre_message() -> Result<()> .await?; send_msg(alice, alice_chat_id, &mut alice_instance).await?; - let post_message = alice.pop_sent_msg().await; let pre_message = alice.pop_sent_msg().await; + let post_message = alice.pop_sent_msg().await; // Bob receives post-message first. let bob_instance = bob.recv_msg(&post_message).await;