From e4d65b2f3bb28afee53b3cbde62f39e83278361d Mon Sep 17 00:00:00 2001 From: iequidoo Date: Fri, 19 Jul 2024 20:52:06 -0300 Subject: [PATCH] fix: Call `send_sync_msg()` only from the SMTP loop (#5780) `Context::send_sync_msg()` mustn't be called from multiple tasks in parallel to avoid sending the same sync items twice because sync items are removed from the db only after successful sending. Let's guarantee this by calling `send_sync_msg()` only from the SMTP loop. Before `send_sync_msg()` could be called in parallel from the SMTP loop and another task doing e.g. `chat::sync()` which led to `test_multidevice_sync_chat` being flaky because of events triggered by duplicated sync messages. --- deltachat-rpc-client/tests/test_something.py | 2 +- src/chat.rs | 10 ++++++---- src/config.rs | 8 +++++--- src/qr.rs | 6 ++---- src/securejoin.rs | 2 ++ src/sync.rs | 13 +++++++++++-- src/test_utils.rs | 1 + 7 files changed, 28 insertions(+), 14 deletions(-) diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index 68a610c78..faf2ddab1 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -103,12 +103,12 @@ def test_account(acfactory) -> None: assert alice.get_chatlist(snapshot=True) assert alice.get_qr_code() assert alice.get_fresh_messages() - assert alice.get_next_messages() # Test sending empty message. assert len(bob.wait_next_messages()) == 0 alice_chat_bob.send_text("") messages = bob.wait_next_messages() + assert bob.get_next_messages() == messages assert len(messages) == 1 message = messages[0] snapshot = message.get_snapshot() diff --git a/src/chat.rs b/src/chat.rs index 62165bd02..1ccb4f06e 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1935,7 +1935,7 @@ impl Chat { self.param.remove(Param::Unpromoted); self.update_param(context).await?; // send_sync_msg() is called (usually) a moment later at send_msg_to_smtp() - // when the group-creation message is actually sent though SMTP - + // when the group creation message is actually sent through SMTP -- // this makes sure, the other devices are aware of grpid that is used in the sync-message. context .sync_qr_code_tokens(Some(self.id)) @@ -2240,7 +2240,7 @@ pub(crate) async fn sync(context: &Context, id: SyncId, action: SyncAction) -> R context .add_sync_item(SyncData::AlterChat { id, action }) .await?; - context.send_sync_msg().await?; + context.scheduler.interrupt_smtp().await; Ok(()) } @@ -3738,12 +3738,14 @@ pub(crate) async fn add_contact_to_chat_ex( if from_handshake && chat.param.get_int(Param::Unpromoted).unwrap_or_default() == 1 { chat.param.remove(Param::Unpromoted); chat.update_param(context).await?; - let _ = context + if context .sync_qr_code_tokens(Some(chat_id)) .await .log_err(context) .is_ok() - && context.send_sync_msg().await.log_err(context).is_ok(); + { + context.scheduler.interrupt_smtp().await; + } } if context.is_self_addr(contact.get_addr()).await? { diff --git a/src/config.rs b/src/config.rs index 7c1773375..c65309da9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -688,7 +688,7 @@ impl Context { { return Ok(()); } - Box::pin(self.send_sync_msg()).await.log_err(self).ok(); + self.scheduler.interrupt_smtp().await; Ok(()) } @@ -1054,7 +1054,8 @@ mod tests { let status = "Synced via usual message"; alice0.set_config(Config::Selfstatus, Some(status)).await?; - alice0.pop_sent_msg().await; // Sync message + alice0.send_sync_msg().await?; + alice0.pop_sent_msg().await; let status1 = "Synced via sync message"; alice1.set_config(Config::Selfstatus, Some(status1)).await?; tcm.send_recv(alice0, alice1, "hi Alice!").await; @@ -1077,7 +1078,8 @@ mod tests { alice0 .set_config(Config::Selfavatar, Some(file.to_str().unwrap())) .await?; - alice0.pop_sent_msg().await; // Sync message + alice0.send_sync_msg().await?; + alice0.pop_sent_msg().await; let file = alice1.dir.path().join("avatar.jpg"); let bytes = include_bytes!("../test-data/image/avatar1000x1000.jpg"); tokio::fs::write(&file, bytes).await?; diff --git a/src/qr.rs b/src/qr.rs index cd82e2f2a..edce65b96 100644 --- a/src/qr.rs +++ b/src/qr.rs @@ -652,7 +652,6 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { context .sync_qr_code_token_deletion(invitenumber, authcode) .await?; - context.send_sync_msg().await?; } Qr::WithdrawVerifyGroup { invitenumber, @@ -664,7 +663,6 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { context .sync_qr_code_token_deletion(invitenumber, authcode) .await?; - context.send_sync_msg().await?; } Qr::ReviveVerifyContact { invitenumber, @@ -674,7 +672,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { token::save(context, token::Namespace::InviteNumber, None, &invitenumber).await?; token::save(context, token::Namespace::Auth, None, &authcode).await?; context.sync_qr_code_tokens(None).await?; - context.send_sync_msg().await?; + context.scheduler.interrupt_smtp().await; } Qr::ReviveVerifyGroup { invitenumber, @@ -694,7 +692,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> { .await?; token::save(context, token::Namespace::Auth, chat_id, &authcode).await?; context.sync_qr_code_tokens(chat_id).await?; - context.send_sync_msg().await?; + context.scheduler.interrupt_smtp().await; } Qr::Login { address, options } => { configure_from_login_qr(context, &address, options).await? diff --git a/src/securejoin.rs b/src/securejoin.rs index a706dd5ff..3f5dbca1e 100644 --- a/src/securejoin.rs +++ b/src/securejoin.rs @@ -98,6 +98,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option) -> Resu let group_name_urlencoded = utf8_percent_encode(group_name, NON_ALPHANUMERIC).to_string(); if sync_token { context.sync_qr_code_tokens(Some(chat.id)).await?; + context.scheduler.interrupt_smtp().await; } format!( "OPENPGP4FPR:{}#a={}&g={}&x={}&i={}&s={}", @@ -112,6 +113,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option) -> Resu // parameters used: a=n=i=s= if sync_token { context.sync_qr_code_tokens(None).await?; + context.scheduler.interrupt_smtp().await; } format!( "OPENPGP4FPR:{}#a={}&n={}&i={}&s={}", diff --git a/src/sync.rs b/src/sync.rs index 22a888a73..29ad69c14 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -120,6 +120,7 @@ impl Context { /// Adds most recent qr-code tokens for a given chat to the list of items to be synced. /// If device synchronization is disabled, /// no tokens exist or the chat is unpromoted, the function does nothing. + /// The caller should perform `SchedulerState::interrupt_smtp()` on its own to trigger sending. pub(crate) async fn sync_qr_code_tokens(&self, chat_id: Option) -> Result<()> { if !self.should_send_sync_msgs().await? { return Ok(()); @@ -154,6 +155,7 @@ impl Context { /// Adds deleted qr-code token to the list of items to be synced /// so that the token also gets deleted on the other devices. + /// This interrupts SMTP on its own. pub(crate) async fn sync_qr_code_token_deletion( &self, invitenumber: String, @@ -164,10 +166,16 @@ impl Context { auth, grpid: None, })) - .await + .await?; + self.scheduler.interrupt_smtp().await; + Ok(()) } /// Sends out a self-sent message with items to be synchronized, if any. + /// + /// Mustn't be called from multiple tasks in parallel to avoid sending the same sync items twice + /// because sync items are removed from the db only after successful sending. We guarantee this + /// by calling `send_sync_msg()` only from the SMTP loop. pub async fn send_sync_msg(&self) -> Result> { if let Some((json, ids)) = self.build_sync_json().await? { let chat_id = @@ -608,7 +616,8 @@ mod tests { alice .set_config(Config::Displayname, Some("Alice Human")) .await?; - alice.pop_sent_msg().await; // Sync message + alice.send_sync_msg().await?; + alice.pop_sent_msg().await; let msg = bob.recv_msg(&alice.pop_sent_msg().await).await; assert_eq!(msg.text, "hi"); diff --git a/src/test_utils.rs b/src/test_utils.rs index d179785c8..3d5350b74 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -1104,6 +1104,7 @@ pub(crate) async fn mark_as_verified(this: &TestContext, other: &TestContext) { /// Pops a sync message from alice0 and receives it on alice1. Should be used after an action on /// alice0's side that implies sending a sync message. pub(crate) async fn sync(alice0: &TestContext, alice1: &TestContext) { + alice0.send_sync_msg().await.unwrap(); let sync_msg = alice0.pop_sent_msg().await; let no_msg = alice1.recv_msg_opt(&sync_msg).await; assert!(no_msg.is_none());