From 8b58b16cb528e29a7f69f1337a3889333fa413c7 Mon Sep 17 00:00:00 2001 From: Hocuri Date: Fri, 10 Apr 2026 21:10:46 +0200 Subject: [PATCH] fix: For bots, wait with emitting IncomingMsg until the Post-Msg arrived (#8104) I used some AI to draft a first version of this, and then reworked it. This is one of multiple possibilities to fix https://github.com/chatmail/core/issues/8041: For bots, the IncomingMsg event is not emitted when the pre-message arrives, only when the post-message arrives. Also, post-messages are downloaded immediately, not after all the small messages are downloaded. The `get_next_msgs()` API is deprecated. Instead, bots need to listen to the IncomingMsg event in order to be notified about new events. Is this acceptable for bots? THE PROBLEM THAT WAS SOLVED BY THIS: With pre-messages, it's hard for bots to wait for the message to be fully downloaded and then process it. Up until now, bots used get_next_msgs() to query the unprocessed messages, then set last_msg_id after processing a message to that they won't process it again. But this will now also return messages that were not fully downloaded. ALTERNATIVES: In the following, I will explain the alternatives, and for why it's not so easy to just make the `get_next_msgs()` API work. If it's not understandable, I'm happy to elaborate more. Core can't just completely ignore the pre-message for two reasons: - If a post-message containing a Webxdc arrives later, and some webxdc updates arrive in the meantime, then these updates will be lost. - The post-message doesn't contain the text (reasoning was to avoid duplicate text for people who didn't upgrade yet during the 2.43.0 rollout) There are multiple solutions: - Add the message as hidden in the database when the pre-message arrives. - When the post-message arrives and we want to make it available for bots, we need to update the msg_id because of how the `get_next_msgs()` API works. This means that we need to update all webxdc updates that reference this msg_id. - Alternatively, we could make webxdc's reference the rfc724_mid instead of the msg_id, so that we don't need stable msg_ids anymore - Alternatively, we could deprecate `get_next_msgs()`, and ask bots to use plain events for message processing again. It's not that bad; worst case, the bot crashes and then forgets to react to some messages, but the user will just try again. And if some message makes the bot crash, then it might actually be good not to try and process it again. - Store the pre-message text and `PostMsgMetadata` (or alternatively, the whole mime) in a new database table. Wait with processing it until the post-message arrives. Additionally, the logic that small messages are downloaded before post-messages should be disabled for bots, in order to prevent reordering. --- deltachat-jsonrpc/src/api.rs | 18 +++++++- .../src/deltachat_rpc_client/account.py | 10 ++++- src/context.rs | 18 +++++++- src/imap.rs | 15 +++++-- src/receive_imf.rs | 24 +++++++++- src/tests/pre_messages/receiving.rs | 45 +++++++++++++++++++ 6 files changed, 120 insertions(+), 10 deletions(-) diff --git a/deltachat-jsonrpc/src/api.rs b/deltachat-jsonrpc/src/api.rs index d57e83336..4be0672a7 100644 --- a/deltachat-jsonrpc/src/api.rs +++ b/deltachat-jsonrpc/src/api.rs @@ -678,7 +678,7 @@ impl CommandApi { ChatId::new(chat_id).get_fresh_msg_cnt(&ctx).await } - /// Gets messages to be processed by the bot and returns their IDs. + /// (deprecated) Gets messages to be processed by the bot and returns their IDs. /// /// Only messages with database ID higher than `last_msg_id` config value /// are returned. After processing the messages, the bot should @@ -686,6 +686,13 @@ impl CommandApi { /// or manually updating the value to avoid getting already /// processed messages. /// + /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives, + /// even if it is not fully downloaded yet. + /// The bot needs to wait for the message to be fully downloaded. + /// Since this is usually not the desired behavior, + /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`types::events::EventType::IncomingMsg`] + /// event for getting notified about new messages. + /// /// [`markseen_msgs`]: Self::markseen_msgs async fn get_next_msgs(&self, account_id: u32) -> Result> { let ctx = self.get_context(account_id).await?; @@ -698,7 +705,7 @@ impl CommandApi { Ok(msg_ids) } - /// Waits for messages to be processed by the bot and returns their IDs. + /// (deprecated) Waits for messages to be processed by the bot and returns their IDs. /// /// This function is similar to [`get_next_msgs`], /// but waits for internal new message notification before returning. @@ -709,6 +716,13 @@ impl CommandApi { /// To shutdown the bot, stopping I/O can be used to interrupt /// pending or next `wait_next_msgs` call. /// + /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives, + /// even if it is not fully downloaded yet. + /// The bot needs to wait for the message to be fully downloaded. + /// Since this is usually not the desired behavior, + /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`types::events::EventType::IncomingMsg`] + /// event for getting notified about new messages. + /// /// [`get_next_msgs`]: Self::get_next_msgs async fn wait_next_msgs(&self, account_id: u32) -> Result> { let ctx = self.get_context(account_id).await?; diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/account.py b/deltachat-rpc-client/src/deltachat_rpc_client/account.py index e7acebd18..55fddd971 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/account.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/account.py @@ -405,7 +405,15 @@ class Account: @futuremethod def wait_next_messages(self) -> list[Message]: - """Wait for new messages and return a list of them.""" + """(deprecated) Wait for new messages and return a list of them. Meant for bots. + + Deprecated 2026-04: This returns the message's id as soon as the first part arrives, + even if it is not fully downloaded yet. + The bot needs to wait for the message to be fully downloaded. + Since this is usually not the desired behavior, + bots should instead use the `EventType.INCOMING_MSG` + event for getting notified about new messages. + """ next_msg_ids = yield self._rpc.wait_next_msgs.future(self.id) return [Message(self, msg_id) for msg_id in next_msg_ids] diff --git a/src/context.rs b/src/context.rs index adcb3b2a7..d60e53110 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1142,10 +1142,17 @@ ORDER BY m.timestamp DESC,m.id DESC", Ok(list) } - /// Returns a list of messages with database ID higher than requested. + /// (deprecated) Returns a list of messages with database ID higher than requested. /// /// Blocked contacts and chats are excluded, /// but self-sent messages and contact requests are included in the results. + /// + /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives, + /// even if it is not fully downloaded yet. + /// The bot needs to wait for the message to be fully downloaded. + /// Since this is usually not the desired behavior, + /// bots should instead use the [`EventType::IncomingMsg`] + /// event for getting notified about new messages. pub async fn get_next_msgs(&self) -> Result> { let last_msg_id = match self.get_config(Config::LastMsgId).await? { Some(s) => MsgId::new(s.parse()?), @@ -1194,7 +1201,7 @@ ORDER BY m.timestamp DESC,m.id DESC", Ok(list) } - /// Returns a list of messages with database ID higher than last marked as seen. + /// (deprecated) Returns a list of messages with database ID higher than last marked as seen. /// /// This function is supposed to be used by bot to request messages /// that are not processed yet. @@ -1204,6 +1211,13 @@ ORDER BY m.timestamp DESC,m.id DESC", /// shortly after notification or notification is manually triggered /// to interrupt waiting. /// Notification may be manually triggered by calling [`Self::stop_io`]. + /// + /// Deprecated 2026-04: This returns the message's id as soon as the first part arrives, + /// even if it is not fully downloaded yet. + /// The bot needs to wait for the message to be fully downloaded. + /// Since this is usually not the desired behavior, + /// bots should instead use the #DC_EVENT_INCOMING_MSG / [`EventType::IncomingMsg`] + /// event for getting notified about new messages. pub async fn wait_next_msgs(&self) -> Result> { self.new_msgs_notify.notified().await; let list = self.get_next_msgs().await?; diff --git a/src/imap.rs b/src/imap.rs index e85d10332..afe7cfb29 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -730,10 +730,19 @@ impl Imap { info!(context, "{message_id:?} is a post-message."); available_post_msgs.push(message_id.clone()); - if download_limit.is_none_or(|download_limit| size <= download_limit) { - download_later.push(message_id.clone()); + let is_bot = context.get_config_bool(Config::Bot).await?; + if is_bot && download_limit.is_none_or(|download_limit| size <= download_limit) + { + uids_fetch.push(uid); + uid_message_ids.insert(uid, message_id); + } else { + if download_limit.is_none_or(|download_limit| size <= download_limit) { + // Download later after all the small messages are downloaded, + // so that large messages don't delay receiving small messages + download_later.push(message_id.clone()); + } + largest_uid_skipped = Some(uid); } - largest_uid_skipped = Some(uid); } else { info!(context, "{message_id:?} is not a post-message."); if download_limit.is_none_or(|download_limit| size <= download_limit) { diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 86e66da11..2438cfed6 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -1069,7 +1069,12 @@ UPDATE msgs SET state=? WHERE let fresh = received_msg.state == MessageState::InFresh && mime_parser.is_system_message != SystemMessage::CallAccepted && mime_parser.is_system_message != SystemMessage::CallEnded; - let important = mime_parser.incoming && fresh && !is_old_contact_request; + let is_bot = context.get_config_bool(Config::Bot).await?; + let is_pre_message = matches!(mime_parser.pre_message, PreMessageMode::Pre { .. }); + let skip_bot_notify = is_bot && is_pre_message; + let important = + mime_parser.incoming && fresh && !is_old_contact_request && !skip_bot_notify; + for msg_id in &received_msg.msg_ids { chat_id.emit_msg_event(context, *msg_id, important); } @@ -2573,7 +2578,22 @@ WHERE id=? ), ) .await?; - context.emit_msgs_changed(original_msg.chat_id, original_msg.id); + + if context.get_config_bool(Config::Bot).await? { + if original_msg.hidden { + // No need to emit an event about the changed message + } else if !original_msg.chat_id.is_trash() { + let fresh = original_msg.state == MessageState::InFresh; + let important = mime_parser.incoming && fresh; + + original_msg + .chat_id + .emit_msg_event(context, original_msg.id, important); + context.new_msgs_notify.notify_one(); + } + } else { + context.emit_msgs_changed(original_msg.chat_id, original_msg.id); + } Ok(()) } diff --git a/src/tests/pre_messages/receiving.rs b/src/tests/pre_messages/receiving.rs index 2f0147e5e..2393638e7 100644 --- a/src/tests/pre_messages/receiving.rs +++ b/src/tests/pre_messages/receiving.rs @@ -5,12 +5,14 @@ use pretty_assertions::assert_eq; use crate::EventType; use crate::chat; use crate::chat::send_msg; +use crate::config::Config; use crate::contact; use crate::download::{DownloadState, PRE_MSG_ATTACHMENT_SIZE_THRESHOLD, PostMsgMetadata}; use crate::message::{Message, MessageState, Viewtype, delete_msgs, markseen_msgs}; use crate::mimeparser::MimeMessage; use crate::param::Param; use crate::reaction::{get_msg_reactions, send_reaction}; +use crate::receive_imf::receive_imf; use crate::summary::assert_summary_texts; use crate::test_utils::TestContextManager; use crate::tests::pre_messages::util::{ @@ -795,3 +797,46 @@ async fn test_chatlist_event_on_post_msg_download() -> Result<()> { Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_bot_pre_message_notifications() -> Result<()> { + let mut tcm = TestContextManager::new(); + let alice = tcm.alice().await; + let bob = tcm.bob().await; + bob.set_config_bool(Config::Bot, true).await?; + + let alice_group_id = alice.create_group_with_members("test group", &[&bob]).await; + + let (pre_message, post_message, _alice_msg_id) = send_large_file_message( + &alice, + alice_group_id, + Viewtype::File, + &vec![0u8; (PRE_MSG_ATTACHMENT_SIZE_THRESHOLD + 1) as usize], + ) + .await?; + + // Bob receives pre-message + bob.evtracker.clear_events(); + receive_imf(&bob, pre_message.payload().as_bytes(), false).await?; + + // Verify Bob does NOT get an IncomingMsg event for the pre-message + assert!( + bob.evtracker + .get_matching_opt(&bob, |e| matches!(e, EventType::IncomingMsg { .. })) + .await + .is_none() + ); + + // Bob receives post-message + receive_imf(&bob, post_message.payload().as_bytes(), false).await?; + + // Verify Bob DOES get an IncomingMsg event for the complete message + bob.evtracker + .get_matching(|e| matches!(e, EventType::IncomingMsg { .. })) + .await; + + let msg = bob.get_last_msg().await; + assert_eq!(msg.download_state, DownloadState::Done); + + Ok(()) +}