mirror of
https://github.com/chatmail/core.git
synced 2026-04-20 06:56:29 +03:00
fix: Call send_sync_msg() only from the SMTP loop (#5780)
`Context::send_sync_msg()` mustn't be called from multiple tasks in parallel to avoid sending the same sync items twice because sync items are removed from the db only after successful sending. Let's guarantee this by calling `send_sync_msg()` only from the SMTP loop. Before `send_sync_msg()` could be called in parallel from the SMTP loop and another task doing e.g. `chat::sync()` which led to `test_multidevice_sync_chat` being flaky because of events triggered by duplicated sync messages.
This commit is contained in:
10
src/chat.rs
10
src/chat.rs
@@ -1935,7 +1935,7 @@ impl Chat {
|
||||
self.param.remove(Param::Unpromoted);
|
||||
self.update_param(context).await?;
|
||||
// send_sync_msg() is called (usually) a moment later at send_msg_to_smtp()
|
||||
// when the group-creation message is actually sent though SMTP -
|
||||
// when the group creation message is actually sent through SMTP --
|
||||
// this makes sure, the other devices are aware of grpid that is used in the sync-message.
|
||||
context
|
||||
.sync_qr_code_tokens(Some(self.id))
|
||||
@@ -2240,7 +2240,7 @@ pub(crate) async fn sync(context: &Context, id: SyncId, action: SyncAction) -> R
|
||||
context
|
||||
.add_sync_item(SyncData::AlterChat { id, action })
|
||||
.await?;
|
||||
context.send_sync_msg().await?;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3738,12 +3738,14 @@ pub(crate) async fn add_contact_to_chat_ex(
|
||||
if from_handshake && chat.param.get_int(Param::Unpromoted).unwrap_or_default() == 1 {
|
||||
chat.param.remove(Param::Unpromoted);
|
||||
chat.update_param(context).await?;
|
||||
let _ = context
|
||||
if context
|
||||
.sync_qr_code_tokens(Some(chat_id))
|
||||
.await
|
||||
.log_err(context)
|
||||
.is_ok()
|
||||
&& context.send_sync_msg().await.log_err(context).is_ok();
|
||||
{
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
}
|
||||
|
||||
if context.is_self_addr(contact.get_addr()).await? {
|
||||
|
||||
@@ -688,7 +688,7 @@ impl Context {
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
Box::pin(self.send_sync_msg()).await.log_err(self).ok();
|
||||
self.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1054,7 +1054,8 @@ mod tests {
|
||||
|
||||
let status = "Synced via usual message";
|
||||
alice0.set_config(Config::Selfstatus, Some(status)).await?;
|
||||
alice0.pop_sent_msg().await; // Sync message
|
||||
alice0.send_sync_msg().await?;
|
||||
alice0.pop_sent_msg().await;
|
||||
let status1 = "Synced via sync message";
|
||||
alice1.set_config(Config::Selfstatus, Some(status1)).await?;
|
||||
tcm.send_recv(alice0, alice1, "hi Alice!").await;
|
||||
@@ -1077,7 +1078,8 @@ mod tests {
|
||||
alice0
|
||||
.set_config(Config::Selfavatar, Some(file.to_str().unwrap()))
|
||||
.await?;
|
||||
alice0.pop_sent_msg().await; // Sync message
|
||||
alice0.send_sync_msg().await?;
|
||||
alice0.pop_sent_msg().await;
|
||||
let file = alice1.dir.path().join("avatar.jpg");
|
||||
let bytes = include_bytes!("../test-data/image/avatar1000x1000.jpg");
|
||||
tokio::fs::write(&file, bytes).await?;
|
||||
|
||||
@@ -652,7 +652,6 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
|
||||
context
|
||||
.sync_qr_code_token_deletion(invitenumber, authcode)
|
||||
.await?;
|
||||
context.send_sync_msg().await?;
|
||||
}
|
||||
Qr::WithdrawVerifyGroup {
|
||||
invitenumber,
|
||||
@@ -664,7 +663,6 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
|
||||
context
|
||||
.sync_qr_code_token_deletion(invitenumber, authcode)
|
||||
.await?;
|
||||
context.send_sync_msg().await?;
|
||||
}
|
||||
Qr::ReviveVerifyContact {
|
||||
invitenumber,
|
||||
@@ -674,7 +672,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
|
||||
token::save(context, token::Namespace::InviteNumber, None, &invitenumber).await?;
|
||||
token::save(context, token::Namespace::Auth, None, &authcode).await?;
|
||||
context.sync_qr_code_tokens(None).await?;
|
||||
context.send_sync_msg().await?;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
Qr::ReviveVerifyGroup {
|
||||
invitenumber,
|
||||
@@ -694,7 +692,7 @@ pub async fn set_config_from_qr(context: &Context, qr: &str) -> Result<()> {
|
||||
.await?;
|
||||
token::save(context, token::Namespace::Auth, chat_id, &authcode).await?;
|
||||
context.sync_qr_code_tokens(chat_id).await?;
|
||||
context.send_sync_msg().await?;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
Qr::Login { address, options } => {
|
||||
configure_from_login_qr(context, &address, options).await?
|
||||
|
||||
@@ -98,6 +98,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option<ChatId>) -> Resu
|
||||
let group_name_urlencoded = utf8_percent_encode(group_name, NON_ALPHANUMERIC).to_string();
|
||||
if sync_token {
|
||||
context.sync_qr_code_tokens(Some(chat.id)).await?;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
format!(
|
||||
"OPENPGP4FPR:{}#a={}&g={}&x={}&i={}&s={}",
|
||||
@@ -112,6 +113,7 @@ pub async fn get_securejoin_qr(context: &Context, group: Option<ChatId>) -> Resu
|
||||
// parameters used: a=n=i=s=
|
||||
if sync_token {
|
||||
context.sync_qr_code_tokens(None).await?;
|
||||
context.scheduler.interrupt_smtp().await;
|
||||
}
|
||||
format!(
|
||||
"OPENPGP4FPR:{}#a={}&n={}&i={}&s={}",
|
||||
|
||||
13
src/sync.rs
13
src/sync.rs
@@ -120,6 +120,7 @@ impl Context {
|
||||
/// Adds most recent qr-code tokens for a given chat to the list of items to be synced.
|
||||
/// If device synchronization is disabled,
|
||||
/// no tokens exist or the chat is unpromoted, the function does nothing.
|
||||
/// The caller should perform `SchedulerState::interrupt_smtp()` on its own to trigger sending.
|
||||
pub(crate) async fn sync_qr_code_tokens(&self, chat_id: Option<ChatId>) -> Result<()> {
|
||||
if !self.should_send_sync_msgs().await? {
|
||||
return Ok(());
|
||||
@@ -154,6 +155,7 @@ impl Context {
|
||||
|
||||
/// Adds deleted qr-code token to the list of items to be synced
|
||||
/// so that the token also gets deleted on the other devices.
|
||||
/// This interrupts SMTP on its own.
|
||||
pub(crate) async fn sync_qr_code_token_deletion(
|
||||
&self,
|
||||
invitenumber: String,
|
||||
@@ -164,10 +166,16 @@ impl Context {
|
||||
auth,
|
||||
grpid: None,
|
||||
}))
|
||||
.await
|
||||
.await?;
|
||||
self.scheduler.interrupt_smtp().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends out a self-sent message with items to be synchronized, if any.
|
||||
///
|
||||
/// Mustn't be called from multiple tasks in parallel to avoid sending the same sync items twice
|
||||
/// because sync items are removed from the db only after successful sending. We guarantee this
|
||||
/// by calling `send_sync_msg()` only from the SMTP loop.
|
||||
pub async fn send_sync_msg(&self) -> Result<Option<MsgId>> {
|
||||
if let Some((json, ids)) = self.build_sync_json().await? {
|
||||
let chat_id =
|
||||
@@ -608,7 +616,8 @@ mod tests {
|
||||
alice
|
||||
.set_config(Config::Displayname, Some("Alice Human"))
|
||||
.await?;
|
||||
alice.pop_sent_msg().await; // Sync message
|
||||
alice.send_sync_msg().await?;
|
||||
alice.pop_sent_msg().await;
|
||||
let msg = bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
assert_eq!(msg.text, "hi");
|
||||
|
||||
|
||||
@@ -1104,6 +1104,7 @@ pub(crate) async fn mark_as_verified(this: &TestContext, other: &TestContext) {
|
||||
/// Pops a sync message from alice0 and receives it on alice1. Should be used after an action on
|
||||
/// alice0's side that implies sending a sync message.
|
||||
pub(crate) async fn sync(alice0: &TestContext, alice1: &TestContext) {
|
||||
alice0.send_sync_msg().await.unwrap();
|
||||
let sync_msg = alice0.pop_sent_msg().await;
|
||||
let no_msg = alice1.recv_msg_opt(&sync_msg).await;
|
||||
assert!(no_msg.is_none());
|
||||
|
||||
Reference in New Issue
Block a user