From 25750de4e19296aba6915cf0c11d1b047aa56c8e Mon Sep 17 00:00:00 2001 From: link2xt Date: Thu, 25 Dec 2025 11:00:32 +0000 Subject: [PATCH] feat: send sync messages over SMTP and do not move them to mvbox --- python/tests/test_1_online.py | 11 +++--- src/chat.rs | 43 +++++++++++----------- src/config.rs | 8 +---- src/config/config_tests.rs | 4 +-- src/imap.rs | 57 ------------------------------ src/message.rs | 1 + src/qr.rs | 4 +-- src/scheduler.rs | 22 ------------ src/securejoin.rs | 4 +-- src/securejoin/securejoin_tests.rs | 2 +- src/smtp.rs | 1 + src/sql/migrations.rs | 2 ++ src/sync.rs | 12 +++---- src/test_utils.rs | 42 +--------------------- src/transport.rs | 2 +- 15 files changed, 46 insertions(+), 169 deletions(-) diff --git a/python/tests/test_1_online.py b/python/tests/test_1_online.py index 8eda148dd..0bfcca075 100644 --- a/python/tests/test_1_online.py +++ b/python/tests/test_1_online.py @@ -268,22 +268,23 @@ def test_enable_mvbox_move(acfactory, lp): assert ac2._evtracker.wait_next_incoming_message().text == "message1" -def test_move_sync_msgs(acfactory): +def test_dont_move_sync_msgs(acfactory): ac1 = acfactory.new_online_configuring_account(bcc_self=True, sync_msgs=True, fix_is_chatmail=True) acfactory.bring_accounts_online() - ac1.direct_imap.select_folder("DeltaChat") + ac1.direct_imap.select_folder("Inbox") # Sync messages may also be sent during the configuration. - mvbox_msg_cnt = len(ac1.direct_imap.get_all_messages()) + inbox_msg_cnt = len(ac1.direct_imap.get_all_messages()) ac1.set_config("displayname", "Alice") ac1._evtracker.get_matching("DC_EVENT_MSG_DELIVERED") ac1.set_config("displayname", "Bob") ac1._evtracker.get_matching("DC_EVENT_MSG_DELIVERED") ac1.direct_imap.select_folder("Inbox") - assert len(ac1.direct_imap.get_all_messages()) == 0 + assert len(ac1.direct_imap.get_all_messages()) == inbox_msg_cnt + 2 + ac1.direct_imap.select_folder("DeltaChat") - assert len(ac1.direct_imap.get_all_messages()) == mvbox_msg_cnt + 2 + assert len(ac1.direct_imap.get_all_messages()) == 0 def test_forward_messages(acfactory, lp): diff --git a/src/chat.rs b/src/chat.rs index 25f679914..de3433fda 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -662,7 +662,7 @@ impl ChatId { context .set_config_internal(Config::LastHousekeeping, None) .await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; Ok(()) } @@ -2115,7 +2115,7 @@ pub(crate) async fn sync(context: &Context, id: SyncId, action: SyncAction) -> R context .add_sync_item(SyncData::AlterChat { id, action }) .await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; Ok(()) } @@ -2736,7 +2736,7 @@ async fn prepare_send_msg( Ok(row_ids) } -/// Constructs jobs for sending a message and inserts them into the appropriate table. +/// Constructs jobs for sending a message and inserts them into the `smtp` table. /// /// Updates the message `GuaranteeE2ee` parameter and persists it /// in the database depending on whether the message @@ -2860,30 +2860,27 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) - let chunk_size = context.get_max_smtp_rcpt_to().await?; let trans_fn = |t: &mut rusqlite::Transaction| { let mut row_ids = Vec::::new(); + if let Some(sync_ids) = rendered_msg.sync_ids_to_delete { t.execute( &format!("DELETE FROM multi_device_sync WHERE id IN ({sync_ids})"), (), )?; - t.execute( - "INSERT INTO imap_send (mime, msg_id) VALUES (?, ?)", - (&rendered_msg.message, msg.id), + } + + for recipients_chunk in recipients.chunks(chunk_size) { + let recipients_chunk = recipients_chunk.join(" "); + let row_id = t.execute( + "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \ + VALUES (?1, ?2, ?3, ?4)", + ( + &rendered_msg.rfc724_mid, + recipients_chunk, + &rendered_msg.message, + msg.id, + ), )?; - } else { - for recipients_chunk in recipients.chunks(chunk_size) { - let recipients_chunk = recipients_chunk.join(" "); - let row_id = t.execute( - "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \ - VALUES (?1, ?2, ?3, ?4)", - ( - &rendered_msg.rfc724_mid, - recipients_chunk, - &rendered_msg.message, - msg.id, - ), - )?; - row_ids.push(row_id.try_into()?); - } + row_ids.push(row_id.try_into()?); } Ok(row_ids) }; @@ -3845,7 +3842,7 @@ pub(crate) async fn add_contact_to_chat_ex( .log_err(context) .is_ok() { - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; } } context.emit_event(EventType::ChatModified(chat_id)); @@ -4319,7 +4316,7 @@ pub async fn save_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { }) .await?; } - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; Ok(()) } diff --git a/src/config.rs b/src/config.rs index b4d65d798..99a25ff93 100644 --- a/src/config.rs +++ b/src/config.rs @@ -605,12 +605,6 @@ impl Context { && !self.get_config_bool(Config::Bot).await?) } - /// Returns whether sync messages should be uploaded to the mvbox. - pub(crate) async fn should_move_sync_msgs(&self) -> Result { - Ok(self.get_config_bool(Config::MvboxMove).await? - || !self.get_config_bool(Config::IsChatmail).await?) - } - /// Returns whether MDNs should be requested. pub(crate) async fn should_request_mdns(&self) -> Result { match self.get_config_bool_opt(Config::MdnsEnabled).await? { @@ -880,7 +874,7 @@ impl Context { { return Ok(()); } - self.scheduler.interrupt_inbox().await; + self.scheduler.interrupt_smtp().await; Ok(()) } diff --git a/src/config/config_tests.rs b/src/config/config_tests.rs index aabe0e7f5..a922d47e0 100644 --- a/src/config/config_tests.rs +++ b/src/config/config_tests.rs @@ -237,7 +237,7 @@ async fn test_no_sync_on_self_sent_msg() -> Result<()> { let status = "Sent via usual message"; alice0.set_config(Config::Selfstatus, Some(status)).await?; alice0.send_sync_msg().await?; - alice0.pop_sent_sync_msg().await; + alice0.pop_sent_msg().await; let status1 = "Synced via sync message"; alice1.set_config(Config::Selfstatus, Some(status1)).await?; tcm.send_recv(alice0, alice1, "hi Alice!").await; @@ -261,7 +261,7 @@ async fn test_no_sync_on_self_sent_msg() -> Result<()> { .set_config(Config::Selfavatar, Some(file.to_str().unwrap())) .await?; alice0.send_sync_msg().await?; - alice0.pop_sent_sync_msg().await; + alice0.pop_sent_msg().await; let file = alice1.dir.path().join("avatar.jpg"); let bytes = include_bytes!("../../test-data/image/avatar1000x1000.jpg"); tokio::fs::write(&file, bytes).await?; diff --git a/src/imap.rs b/src/imap.rs index 5bac1e201..0806c14a9 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1110,52 +1110,6 @@ impl Session { Ok(()) } - /// Uploads sync messages from the `imap_send` table with `\Seen` flag set. - pub(crate) async fn send_sync_msgs(&mut self, context: &Context, folder: &str) -> Result<()> { - context.send_sync_msg().await?; - while let Some((id, mime, msg_id, attempts)) = context - .sql - .query_row_optional( - "SELECT id, mime, msg_id, attempts FROM imap_send ORDER BY id LIMIT 1", - (), - |row| { - let id: i64 = row.get(0)?; - let mime: String = row.get(1)?; - let msg_id: MsgId = row.get(2)?; - let attempts: i64 = row.get(3)?; - Ok((id, mime, msg_id, attempts)) - }, - ) - .await - .context("Failed to SELECT from imap_send")? - { - let res = self - .append(folder, Some("(\\Seen)"), None, mime) - .await - .with_context(|| format!("IMAP APPEND to {folder} failed for {msg_id}")) - .log_err(context); - if res.is_ok() { - msg_id.set_delivered(context).await?; - } - const MAX_ATTEMPTS: i64 = 2; - if res.is_ok() || attempts >= MAX_ATTEMPTS - 1 { - context - .sql - .execute("DELETE FROM imap_send WHERE id=?", (id,)) - .await - .context("Failed to delete from imap_send")?; - } else { - context - .sql - .execute("UPDATE imap_send SET attempts=attempts+1 WHERE id=?", (id,)) - .await - .context("Failed to update imap_send.attempts")?; - res?; - } - } - Ok(()) - } - /// Stores pending `\Seen` flags for messages in `imap_markseen` table. pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> { let rows = context @@ -2113,17 +2067,6 @@ async fn needs_move_to_mvbox( headers: &[mailparse::MailHeader<'_>], ) -> Result { let has_chat_version = headers.get_header_value(HeaderDef::ChatVersion).is_some(); - if !context.get_config_bool(Config::IsChatmail).await? - && has_chat_version - && headers - .get_header_value(HeaderDef::AutoSubmitted) - .filter(|val| val.eq_ignore_ascii_case("auto-generated")) - .is_some() - && let Some(from) = mimeparser::get_from(headers) - && context.is_self_addr(&from.addr).await? - { - return Ok(true); - } if !context.get_config_bool(Config::MvboxMove).await? { return Ok(false); } diff --git a/src/message.rs b/src/message.rs index 129e037e6..f02aa2619 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1717,6 +1717,7 @@ pub async fn delete_msgs_ex( msgs: deleted_rfc724_mid, }) .await?; + context.scheduler.interrupt_smtp().await; } for &msg_id in msg_ids { diff --git a/src/qr.rs b/src/qr.rs index 151b49672..4fb408bb9 100644 --- a/src/qr.rs +++ b/src/qr.rs @@ -904,7 +904,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { .await?; token::save(context, token::Namespace::Auth, None, &authcode, timestamp).await?; context.sync_qr_code_tokens(None).await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; } Qr::ReviveVerifyGroup { invitenumber, @@ -936,7 +936,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { ) .await?; context.sync_qr_code_tokens(Some(&grpid)).await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; } Qr::Login { address, options } => { let mut param = login_param_from_login_qr(&address, options)?; diff --git a/src/scheduler.rs b/src/scheduler.rs index 538cc1aec..b6055d8ea 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -572,28 +572,6 @@ async fn fetch_idle( }; if folder_config == Config::ConfiguredInboxFolder { - let mvbox; - let syncbox = match ctx.should_move_sync_msgs().await? { - false => &watch_folder, - true => { - mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?; - mvbox.as_deref().unwrap_or(&watch_folder) - } - }; - if ctx - .get_config(Config::ConfiguredAddr) - .await? - .unwrap_or_default() - == connection.addr - { - session - .send_sync_msgs(ctx, syncbox) - .await - .context("fetch_idle: send_sync_msgs") - .log_err(ctx) - .ok(); - } - session .store_seen_flags_on_imap(ctx) .await diff --git a/src/securejoin.rs b/src/securejoin.rs index dcf418708..01d5cbdb9 100644 --- a/src/securejoin.rs +++ b/src/securejoin.rs @@ -160,7 +160,7 @@ pub async fn get_securejoin_qr(context: &Context, chat: Option) -> Resul context .sync_qr_code_tokens(Some(chat.grpid.as_str())) .await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; } let chat_name = chat.get_name(); @@ -192,7 +192,7 @@ pub async fn get_securejoin_qr(context: &Context, chat: Option) -> Resul .replace("%20", "+"); if sync_token { context.sync_qr_code_tokens(None).await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; } format!( "https://i.delta.chat/#{fingerprint}&i={invitenumber}&s={auth}&a={self_addr_urlencoded}&n={self_name_urlencoded}", diff --git a/src/securejoin/securejoin_tests.rs b/src/securejoin/securejoin_tests.rs index 033306673..33467c19a 100644 --- a/src/securejoin/securejoin_tests.rs +++ b/src/securejoin/securejoin_tests.rs @@ -1019,7 +1019,7 @@ async fn test_expired_synced_auth_token() -> Result<()> { let qr = get_securejoin_qr(alice2, None).await?; alice2.send_sync_msg().await.unwrap(); - let sync_msg = alice2.pop_sent_sync_msg().await; + let sync_msg = alice2.pop_sent_msg().await; // One week passes, QR code expires. SystemTime::shift(Duration::from_secs(7 * 24 * 3600)); diff --git a/src/smtp.rs b/src/smtp.rs index 0a4ef049a..0e8b2358a 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -495,6 +495,7 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> { pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> { let ratelimited = if context.ratelimit.read().await.can_send() { // add status updates and sync messages to end of sending queue + context.send_sync_msg().await?; context.flush_status_updates().await?; false } else { diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 540042495..dbd4eeb3c 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -1011,6 +1011,8 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); inc_and_check(&mut migration_version, 119)?; if dbversion < migration_version { + // This table is deprecated sinc 2025-12-25. + // Sync messages are again sent over SMTP. sql.execute_migration( "CREATE TABLE imap_send ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/src/sync.rs b/src/sync.rs index 66810a50a..5eb403d10 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -175,7 +175,7 @@ impl Context { /// Adds most recent qr-code tokens for the given group or self-contact to the list of items to /// be synced. If device synchronization is disabled, /// no tokens exist or the chat is unpromoted, the function does nothing. - /// The caller should call `SchedulerState::interrupt_inbox()` on its own to trigger sending. + /// The caller should call `SchedulerState::interrupt_smtp()` on its own to trigger sending. pub(crate) async fn sync_qr_code_tokens(&self, grpid: Option<&str>) -> Result<()> { if !self.should_send_sync_msgs().await? { return Ok(()); @@ -208,7 +208,7 @@ impl Context { grpid: None, })) .await?; - self.scheduler.interrupt_inbox().await; + self.scheduler.interrupt_smtp().await; Ok(()) } @@ -675,7 +675,7 @@ mod tests { // let alice's other device receive and execute the sync message, // also here, self-talk should stay hidden - let sent_msg = alice.pop_sent_sync_msg().await; + let sent_msg = alice.pop_sent_msg().await; let alice2 = TestContext::new_alice().await; alice2.set_config_bool(Config::SyncMsgs, true).await?; alice2.recv_msg_trash(&sent_msg).await; @@ -722,7 +722,7 @@ mod tests { })) .await?; alice1.send_sync_msg().await?.unwrap(); - alice1.pop_sent_sync_msg().await + alice1.pop_sent_msg().await } else { let chat = alice1.get_self_chat().await; alice1.send_text(chat.id, "Hi").await @@ -760,7 +760,7 @@ mod tests { .set_config(Config::Displayname, Some("Alice Human")) .await?; alice.send_sync_msg().await?; - alice.pop_sent_sync_msg().await; + alice.pop_sent_msg().await; let msg = bob.recv_msg(&alice.pop_sent_msg().await).await; assert_eq!(msg.text, "hi"); @@ -794,7 +794,7 @@ mod tests { // group is promoted for compatibility (because the group could be created by older Core). // TODO: assert!(msg_id.is_none()); assert!(msg_id.is_some()); - let sent = alice.pop_sent_sync_msg().await; + let sent = alice.pop_sent_msg().await; let msg = alice.parse_msg(&sent).await; let mut sync_items = msg.sync_items.unwrap().items; assert_eq!(sync_items.len(), 1); diff --git a/src/test_utils.rs b/src/test_utils.rs index 8f837ebf1..666140802 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -711,46 +711,6 @@ impl TestContext { }) } - /// Retrieves a sent sync message from the db. - /// - /// This retrieves and removes a sync message which has been scheduled to send from the jobs - /// table. Messages are returned in the order they have been sent. - /// - /// Panics if there is no message or on any error. - pub async fn pop_sent_sync_msg(&self) -> SentMessage<'_> { - let (id, msg_id, payload) = self - .ctx - .sql - .query_row( - "SELECT id, msg_id, mime \ - FROM imap_send \ - ORDER BY id", - (), - |row| { - let rowid: i64 = row.get(0)?; - let msg_id: MsgId = row.get(1)?; - let mime: String = row.get(2)?; - Ok((rowid, msg_id, mime)) - }, - ) - .await - .expect("query_row failed"); - self.ctx - .sql - .execute("DELETE FROM imap_send WHERE id=?", (id,)) - .await - .expect("failed to remove job"); - update_msg_state(&self.ctx, msg_id, MessageState::OutDelivered) - .await - .expect("failed to update message state"); - SentMessage { - payload, - sender_msg_id: msg_id, - sender_context: &self.ctx, - recipients: self.get_primary_self_addr().await.unwrap(), - } - } - /// Parses a message. /// /// Parsing a message does not run the entire receive pipeline, but is not without @@ -1550,7 +1510,7 @@ pub(crate) async fn mark_as_verified(this: &TestContext, other: &TestContext) { /// alice0's side that implies sending a sync message. pub(crate) async fn sync(alice0: &TestContext, alice1: &TestContext) { alice0.send_sync_msg().await.unwrap(); - let sync_msg = alice0.pop_sent_sync_msg().await; + let sync_msg = alice0.pop_sent_msg().await; alice1.recv_msg_trash(&sync_msg).await; } diff --git a/src/transport.rs b/src/transport.rs index a5c05874e..751c13fc1 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -710,7 +710,7 @@ pub(crate) async fn send_sync_transports(context: &Context) -> Result<()> { removed_transports, }) .await?; - context.scheduler.interrupt_inbox().await; + context.scheduler.interrupt_smtp().await; Ok(()) }