diff --git a/deltachat-jsonrpc/typescript/test/online.ts b/deltachat-jsonrpc/typescript/test/online.ts index 92e42776b..2d90e8060 100644 --- a/deltachat-jsonrpc/typescript/test/online.ts +++ b/deltachat-jsonrpc/typescript/test/online.ts @@ -86,10 +86,7 @@ describe("online tests", function () { null ); const chatId = await dc.rpc.createChatByContactId(accountId1, contactId); - const eventPromise = Promise.race([ - waitForEvent(dc, "MsgsChanged", accountId2), - waitForEvent(dc, "IncomingMsg", accountId2), - ]); + const eventPromise = waitForEvent(dc, "IncomingMsg", accountId2); await dc.rpc.miscSendTextMessage(accountId1, chatId, "Hello"); const { chatId: chatIdOnAccountB } = await eventPromise; @@ -119,10 +116,7 @@ describe("online tests", function () { null ); const chatId = await dc.rpc.createChatByContactId(accountId1, contactId); - const eventPromise = Promise.race([ - waitForEvent(dc, "MsgsChanged", accountId2), - waitForEvent(dc, "IncomingMsg", accountId2), - ]); + const eventPromise = waitForEvent(dc, "IncomingMsg", accountId2); dc.rpc.miscSendTextMessage(accountId1, chatId, "Hello2"); // wait for message from A console.log("wait for message from A"); @@ -143,10 +137,7 @@ describe("online tests", function () { ); expect(message.text).equal("Hello2"); // Send message back from B to A - const eventPromise2 = Promise.race([ - waitForEvent(dc, "MsgsChanged", accountId1), - waitForEvent(dc, "IncomingMsg", accountId1), - ]); + const eventPromise2 = waitForEvent(dc, "IncomingMsg", accountId1); dc.rpc.miscSendTextMessage(accountId2, chatId, "super secret message"); // Check if answer arrives at A and if it is encrypted await eventPromise2; diff --git a/python/tests/test_1_online.py b/python/tests/test_1_online.py index 2d2976b03..69df000a7 100644 --- a/python/tests/test_1_online.py +++ b/python/tests/test_1_online.py @@ -488,10 +488,18 @@ def test_move_sync_msgs(acfactory): ac1 = acfactory.new_online_configuring_account(bcc_self=True, sync_msgs=True, fix_is_chatmail=True) acfactory.bring_accounts_online() + ac1.direct_imap.select_folder("DeltaChat") + # Sync messages may also be sent during the configuration. + mvbox_msg_cnt = len(ac1.direct_imap.get_all_messages()) + ac1.set_config("displayname", "Alice") - ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") + ac1._evtracker.get_matching("DC_EVENT_MSG_DELIVERED") ac1.set_config("displayname", "Bob") - ac1._evtracker.get_matching("DC_EVENT_IMAP_MESSAGE_MOVED") + ac1._evtracker.get_matching("DC_EVENT_MSG_DELIVERED") + ac1.direct_imap.select_folder("Inbox") + assert len(ac1.direct_imap.get_all_messages()) == 0 + ac1.direct_imap.select_folder("DeltaChat") + assert len(ac1.direct_imap.get_all_messages()) == mvbox_msg_cnt + 2 def test_forward_messages(acfactory, lp): diff --git a/src/chat.rs b/src/chat.rs index 9ee0a5f85..4ee894bf7 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -2245,7 +2245,7 @@ pub(crate) async fn sync(context: &Context, id: SyncId, action: SyncAction) -> R context .add_sync_item(SyncData::AlterChat { id, action }) .await?; - context.scheduler.interrupt_smtp().await; + context.scheduler.interrupt_inbox().await; Ok(()) } @@ -2906,10 +2906,9 @@ async fn prepare_send_msg( create_send_msg_jobs(context, msg).await } -/// Constructs jobs for sending a message and inserts them into the `smtp` table. +/// Constructs jobs for sending a message and inserts them into the appropriate table. /// -/// Returns row ids if jobs were created or an empty `Vec` otherwise, e.g. when sending to a -/// group with only self and no BCC-to-self configured. +/// Returns row ids if `smtp` table jobs were created or an empty `Vec` otherwise. /// /// The caller has to interrupt SMTP loop or otherwise process new rows. pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result> { @@ -3003,12 +3002,6 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) - } } - if let Some(sync_ids) = rendered_msg.sync_ids_to_delete { - if let Err(err) = context.delete_sync_ids(sync_ids).await { - error!(context, "Failed to delete sync ids: {err:#}."); - } - } - if attach_selfavatar { if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, now).await { error!(context, "Failed to set selfavatar timestamp: {err:#}."); @@ -3025,19 +3018,30 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) - let chunk_size = context.get_max_smtp_rcpt_to().await?; let trans_fn = |t: &mut rusqlite::Transaction| { let mut row_ids = Vec::::new(); - for recipients_chunk in recipients.chunks(chunk_size) { - let recipients_chunk = recipients_chunk.join(" "); - let row_id = t.execute( - "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \ - VALUES (?1, ?2, ?3, ?4)", - ( - &rendered_msg.rfc724_mid, - recipients_chunk, - &rendered_msg.message, - msg.id, - ), + if let Some(sync_ids) = rendered_msg.sync_ids_to_delete { + t.execute( + &format!("DELETE FROM multi_device_sync WHERE id IN ({sync_ids})"), + (), )?; - row_ids.push(row_id.try_into()?); + t.execute( + "INSERT INTO imap_send (mime, msg_id) VALUES (?, ?)", + (&rendered_msg.message, msg.id), + )?; + } else { + for recipients_chunk in recipients.chunks(chunk_size) { + let recipients_chunk = recipients_chunk.join(" "); + let row_id = t.execute( + "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \ + VALUES (?1, ?2, ?3, ?4)", + ( + &rendered_msg.rfc724_mid, + recipients_chunk, + &rendered_msg.message, + msg.id, + ), + )?; + row_ids.push(row_id.try_into()?); + } } Ok(row_ids) }; @@ -3793,7 +3797,7 @@ pub(crate) async fn add_contact_to_chat_ex( .log_err(context) .is_ok() { - context.scheduler.interrupt_smtp().await; + context.scheduler.interrupt_inbox().await; } } context.emit_event(EventType::ChatModified(chat_id)); diff --git a/src/config.rs b/src/config.rs index 48985645d..422529215 100644 --- a/src/config.rs +++ b/src/config.rs @@ -589,6 +589,12 @@ impl Context { && !self.get_config_bool(Config::Bot).await?) } + /// Returns whether sync messages should be uploaded to the mvbox. + pub(crate) async fn should_move_sync_msgs(&self) -> Result { + Ok(self.get_config_bool(Config::MvboxMove).await? + || !self.get_config_bool(Config::IsChatmail).await?) + } + /// Returns whether MDNs should be requested. pub(crate) async fn should_request_mdns(&self) -> Result { match self.get_config_bool_opt(Config::MdnsEnabled).await? { @@ -792,7 +798,7 @@ impl Context { { return Ok(()); } - self.scheduler.interrupt_smtp().await; + self.scheduler.interrupt_inbox().await; Ok(()) } @@ -1175,7 +1181,7 @@ mod tests { let status = "Synced via usual message"; alice0.set_config(Config::Selfstatus, Some(status)).await?; alice0.send_sync_msg().await?; - alice0.pop_sent_msg().await; + alice0.pop_sent_sync_msg().await; let status1 = "Synced via sync message"; alice1.set_config(Config::Selfstatus, Some(status1)).await?; tcm.send_recv(alice0, alice1, "hi Alice!").await; @@ -1199,7 +1205,7 @@ mod tests { .set_config(Config::Selfavatar, Some(file.to_str().unwrap())) .await?; alice0.send_sync_msg().await?; - alice0.pop_sent_msg().await; + alice0.pop_sent_sync_msg().await; let file = alice1.dir.path().join("avatar.jpg"); let bytes = include_bytes!("../test-data/image/avatar1000x1000.jpg"); tokio::fs::write(&file, bytes).await?; diff --git a/src/imap.rs b/src/imap.rs index 39724c5a1..40f96b101 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -32,6 +32,7 @@ use crate::contact::{Contact, ContactId, Modifier, Origin}; use crate::context::Context; use crate::events::EventType; use crate::headerdef::{HeaderDef, HeaderDefMap}; +use crate::log::LogExt; use crate::login_param::{ prioritize_server_login_params, ConfiguredLoginParam, ConfiguredServerLoginParam, }; @@ -1043,6 +1044,52 @@ impl Session { Ok(()) } + /// Uploads sync messages from the `imap_send` table with `\Seen` flag set. + pub(crate) async fn send_sync_msgs(&mut self, context: &Context, folder: &str) -> Result<()> { + context.send_sync_msg().await?; + while let Some((id, mime, msg_id, attempts)) = context + .sql + .query_row_optional( + "SELECT id, mime, msg_id, attempts FROM imap_send ORDER BY id LIMIT 1", + (), + |row| { + let id: i64 = row.get(0)?; + let mime: String = row.get(1)?; + let msg_id: MsgId = row.get(2)?; + let attempts: i64 = row.get(3)?; + Ok((id, mime, msg_id, attempts)) + }, + ) + .await + .context("Failed to SELECT from imap_send")? + { + let res = self + .append(folder, Some("(\\Seen)"), None, mime) + .await + .with_context(|| format!("IMAP APPEND to {folder} failed for {msg_id}")) + .log_err(context); + if res.is_ok() { + msg_id.set_delivered(context).await?; + } + const MAX_ATTEMPTS: i64 = 2; + if res.is_ok() || attempts >= MAX_ATTEMPTS - 1 { + context + .sql + .execute("DELETE FROM imap_send WHERE id=?", (id,)) + .await + .context("Failed to delete from imap_send")?; + } else { + context + .sql + .execute("UPDATE imap_send SET attempts=attempts+1 WHERE id=?", (id,)) + .await + .context("Failed to update imap_send.attempts")?; + res?; + } + } + Ok(()) + } + /// Stores pending `\Seen` flags for messages in `imap_markseen` table. pub(crate) async fn store_seen_flags_on_imap(&mut self, context: &Context) -> Result<()> { let rows = context diff --git a/src/mimefactory.rs b/src/mimefactory.rs index cbe7da4d0..5c5add148 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -104,10 +104,8 @@ pub struct RenderedEmail { pub is_gossiped: bool, pub last_added_location_id: Option, - /// A comma-separated string of sync-IDs that are used by the rendered email - /// and must be deleted once the message is actually queued for sending - /// (deletion must be done by `delete_sync_ids()`). - /// If the rendered email is not queued for sending, the IDs must not be deleted. + /// A comma-separated string of sync-IDs that are used by the rendered email and must be deleted + /// from `multi_device_sync` once the message is actually queued for sending. pub sync_ids_to_delete: Option, /// Message ID (Message in the sense of Email) diff --git a/src/qr.rs b/src/qr.rs index 2ad2f52ad..b2aa8e4d5 100644 --- a/src/qr.rs +++ b/src/qr.rs @@ -719,7 +719,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { token::save(context, token::Namespace::InviteNumber, None, &invitenumber).await?; token::save(context, token::Namespace::Auth, None, &authcode).await?; context.sync_qr_code_tokens(None).await?; - context.scheduler.interrupt_smtp().await; + context.scheduler.interrupt_inbox().await; } Qr::ReviveVerifyGroup { invitenumber, @@ -736,7 +736,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { .await?; token::save(context, token::Namespace::Auth, Some(&grpid), &authcode).await?; context.sync_qr_code_tokens(Some(&grpid)).await?; - context.scheduler.interrupt_smtp().await; + context.scheduler.interrupt_inbox().await; } Qr::Login { address, options } => { configure_from_login_qr(context, &address, options).await? diff --git a/src/scheduler.rs b/src/scheduler.rs index 15147f204..4377d6640 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -568,6 +568,21 @@ async fn fetch_idle( }; if folder_config == Config::ConfiguredInboxFolder { + let mvbox; + let syncbox = match ctx.should_move_sync_msgs().await? { + false => &watch_folder, + true => { + mvbox = ctx.get_config(Config::ConfiguredMvboxFolder).await?; + mvbox.as_deref().unwrap_or(&watch_folder) + } + }; + session + .send_sync_msgs(ctx, syncbox) + .await + .context("fetch_idle: send_sync_msgs") + .log_err(ctx) + .ok(); + session .store_seen_flags_on_imap(ctx) .await diff --git a/src/securejoin.rs b/src/securejoin.rs index 93877540e..476f9123f 100644 --- a/src/securejoin.rs +++ b/src/securejoin.rs @@ -109,7 +109,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option) -> Resu context .sync_qr_code_tokens(Some(chat.grpid.as_str())) .await?; - context.scheduler.interrupt_smtp().await; + context.scheduler.interrupt_inbox().await; } format!( "OPENPGP4FPR:{}#a={}&g={}&x={}&i={}&s={}", @@ -124,7 +124,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option) -> Resu // parameters used: a=n=i=s= if sync_token { context.sync_qr_code_tokens(None).await?; - context.scheduler.interrupt_smtp().await; + context.scheduler.interrupt_inbox().await; } format!( "OPENPGP4FPR:{}#a={}&n={}&i={}&s={}", diff --git a/src/smtp.rs b/src/smtp.rs index 5c1d1af2f..f9e5ffed6 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -485,7 +485,6 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) let ratelimited = if context.ratelimit.read().await.can_send() { // add status updates and sync messages to end of sending queue context.flush_status_updates().await?; - context.send_sync_msg().await?; false } else { true diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index f357de8b3..fa6a037b3 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -991,6 +991,20 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); .await?; } + inc_and_check(&mut migration_version, 119)?; + if dbversion < migration_version { + sql.execute_migration( + "CREATE TABLE imap_send ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + mime TEXT NOT NULL, -- Message content + msg_id INTEGER NOT NULL, -- ID of the message in the `msgs` table + attempts INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the message + )", + migration_version, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? diff --git a/src/sync.rs b/src/sync.rs index 23b8ae5e5..0aa4810f0 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -120,7 +120,7 @@ impl Context { /// Adds most recent qr-code tokens for the given group or self-contact to the list of items to /// be synced. If device synchronization is disabled, /// no tokens exist or the chat is unpromoted, the function does nothing. - /// The caller should perform `SchedulerState::interrupt_smtp()` on its own to trigger sending. + /// The caller should call `SchedulerState::interrupt_inbox()` on its own to trigger sending. pub(crate) async fn sync_qr_code_tokens(&self, grpid: Option<&str>) -> Result<()> { if !self.should_send_sync_msgs().await? { return Ok(()); @@ -153,7 +153,7 @@ impl Context { grpid: None, })) .await?; - self.scheduler.interrupt_smtp().await; + self.scheduler.interrupt_inbox().await; Ok(()) } @@ -233,17 +233,6 @@ impl Context { .body(json) } - /// Deletes IDs as returned by `build_sync_json()`. - pub(crate) async fn delete_sync_ids(&self, ids: String) -> Result<()> { - self.sql - .execute( - &format!("DELETE FROM multi_device_sync WHERE id IN ({ids});"), - (), - ) - .await?; - Ok(()) - } - /// Takes a JSON string created by `build_sync_json()` /// and construct `SyncItems` from it. pub(crate) fn parse_sync_items(&self, serialized: String) -> Result { @@ -384,7 +373,12 @@ mod tests { ); assert!(t.build_sync_json().await?.is_some()); - t.delete_sync_ids(ids).await?; + t.sql + .execute( + &format!("DELETE FROM multi_device_sync WHERE id IN ({ids})"), + (), + ) + .await?; assert!(t.build_sync_json().await?.is_none()); let sync_items = t.parse_sync_items(serialized)?; @@ -565,7 +559,7 @@ mod tests { // let alice's other device receive and execute the sync message, // also here, self-talk should stay hidden - let sent_msg = alice.pop_sent_msg().await; + let sent_msg = alice.pop_sent_sync_msg().await; let alice2 = TestContext::new_alice().await; alice2.set_config_bool(Config::SyncMsgs, true).await?; alice2.recv_msg_trash(&sent_msg).await; @@ -593,7 +587,7 @@ mod tests { .set_config(Config::Displayname, Some("Alice Human")) .await?; alice.send_sync_msg().await?; - alice.pop_sent_msg().await; + alice.pop_sent_sync_msg().await; let msg = bob.recv_msg(&alice.pop_sent_msg().await).await; assert_eq!(msg.text, "hi"); @@ -628,7 +622,7 @@ mod tests { // group is promoted for compatibility (because the group could be created by older Core). // TODO: assert!(msg_id.is_none()); assert!(msg_id.is_some()); - let sent = alice.pop_sent_msg().await; + let sent = alice.pop_sent_sync_msg().await; let msg = alice.parse_msg(&sent).await; let mut sync_items = msg.sync_items.unwrap().items; assert_eq!(sync_items.len(), 1); diff --git a/src/test_utils.rs b/src/test_utils.rs index 05551558b..bb9aaf139 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -438,7 +438,7 @@ impl TestContext { /// Retrieves a sent message from the jobs table. /// /// This retrieves and removes a message which has been scheduled to send from the jobs - /// table. Messages are returned in the order they have been sent. + /// table. Messages are returned in the reverse order of sending. /// /// Panics if there is no message or on any error. pub async fn pop_sent_msg(&self) -> SentMessage<'_> { @@ -532,6 +532,46 @@ impl TestContext { }) } + /// Retrieves a sent sync message from the db. + /// + /// This retrieves and removes a sync message which has been scheduled to send from the jobs + /// table. Messages are returned in the order they have been sent. + /// + /// Panics if there is no message or on any error. + pub async fn pop_sent_sync_msg(&self) -> SentMessage<'_> { + let (id, msg_id, payload) = self + .ctx + .sql + .query_row( + "SELECT id, msg_id, mime \ + FROM imap_send \ + ORDER BY id", + (), + |row| { + let rowid: i64 = row.get(0)?; + let msg_id: MsgId = row.get(1)?; + let mime: String = row.get(2)?; + Ok((rowid, msg_id, mime)) + }, + ) + .await + .expect("query_row failed"); + self.ctx + .sql + .execute("DELETE FROM imap_send WHERE id=?", (id,)) + .await + .expect("failed to remove job"); + update_msg_state(&self.ctx, msg_id, MessageState::OutDelivered) + .await + .expect("failed to update message state"); + SentMessage { + payload, + sender_msg_id: msg_id, + sender_context: &self.ctx, + recipients: self.get_primary_self_addr().await.unwrap(), + } + } + /// Parses a message. /// /// Parsing a message does not run the entire receive pipeline, but is not without @@ -1141,7 +1181,7 @@ pub(crate) async fn mark_as_verified(this: &TestContext, other: &TestContext) { /// alice0's side that implies sending a sync message. pub(crate) async fn sync(alice0: &TestContext, alice1: &TestContext) { alice0.send_sync_msg().await.unwrap(); - let sync_msg = alice0.pop_sent_msg().await; + let sync_msg = alice0.pop_sent_sync_msg().await; let no_msg = alice1.recv_msg_opt(&sync_msg).await; assert!(no_msg.is_none()); }