mirror of
https://github.com/chatmail/core.git
synced 2026-05-24 01:06:31 +03:00
fix: Don't mark messages added to the db later as noticed when handling self-MDN
The device which issued the MDN might not have these messages at that moment, so only mark messages having lower ids as noticed. Still, additionally filter messages by timestamp to protect from multi-relay message reordering and overall rely less on the server side. We assume that all clocks in the chat are synchronized. This doesn't fix the problem completely because messages may be received by devices in different order, but should fix it in most cases.
This commit is contained in:
32
src/chat.rs
32
src/chat.rs
@@ -1,7 +1,7 @@
|
||||
//! # Chat module.
|
||||
|
||||
use std::cmp;
|
||||
use std::collections::{BTreeSet, HashMap};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||
use std::fmt;
|
||||
use std::io::Cursor;
|
||||
use std::marker::Sync;
|
||||
@@ -3325,39 +3325,41 @@ pub(crate) async fn mark_old_messages_as_noticed(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut msgs_by_chat: HashMap<ChatId, ReceivedMsg> = HashMap::new();
|
||||
let mut updated_chats = BTreeMap::new();
|
||||
for msg in msgs {
|
||||
let chat_id = msg.chat_id;
|
||||
if let Some(existing_msg) = msgs_by_chat.get(&chat_id) {
|
||||
if msg.sort_timestamp > existing_msg.sort_timestamp {
|
||||
msgs_by_chat.insert(chat_id, msg);
|
||||
}
|
||||
} else {
|
||||
msgs_by_chat.insert(chat_id, msg);
|
||||
}
|
||||
let Some(&msg_id) = msg.msg_ids.last() else {
|
||||
continue;
|
||||
};
|
||||
updated_chats
|
||||
.entry(msg.chat_id)
|
||||
.and_modify(|val| *val = cmp::max(*val, (msg.sort_timestamp, msg_id)))
|
||||
.or_insert((msg.sort_timestamp, msg_id));
|
||||
}
|
||||
|
||||
let changed_chats = context
|
||||
.sql
|
||||
.transaction(|transaction| {
|
||||
let mut changed_chats = Vec::new();
|
||||
for (_, msg) in msgs_by_chat {
|
||||
for (chat_id, (timestamp, msg_id)) in updated_chats {
|
||||
let changed_rows = transaction.execute(
|
||||
// Do the same as in receive_imf_inner().
|
||||
"UPDATE msgs
|
||||
SET state=?
|
||||
WHERE state=?
|
||||
AND hidden=0
|
||||
AND chat_id=?
|
||||
AND timestamp<=?;",
|
||||
AND timestamp<=?
|
||||
AND id<?",
|
||||
(
|
||||
MessageState::InNoticed,
|
||||
MessageState::InFresh,
|
||||
msg.chat_id,
|
||||
msg.sort_timestamp,
|
||||
chat_id,
|
||||
timestamp,
|
||||
msg_id,
|
||||
),
|
||||
)?;
|
||||
if changed_rows > 0 {
|
||||
changed_chats.push(msg.chat_id);
|
||||
changed_chats.push(chat_id);
|
||||
}
|
||||
}
|
||||
Ok(changed_chats)
|
||||
|
||||
@@ -982,12 +982,18 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
// Don't mark messages added to the db later as noticed, regardless of
|
||||
// `timestamp` -- the device which issued the MDN might not have these
|
||||
// messages at that moment. Still, additionally filter messages by timestamp
|
||||
// to protect from multi-relay message reordering and overall rely less on
|
||||
// the server side. We assume that all clocks in the chat are synchronized.
|
||||
"
|
||||
UPDATE msgs SET state=? WHERE
|
||||
state=? AND
|
||||
hidden=0 AND
|
||||
chat_id=? AND
|
||||
(timestamp,id)<(?,?)",
|
||||
timestamp<=? AND
|
||||
id<?",
|
||||
(
|
||||
MessageState::InNoticed,
|
||||
MessageState::InFresh,
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::imap::prefetch_should_download;
|
||||
use crate::imex::{ImexMode, imex};
|
||||
use crate::key;
|
||||
use crate::securejoin::get_securejoin_qr;
|
||||
use crate::smtp;
|
||||
use crate::test_utils;
|
||||
use crate::test_utils::{
|
||||
TestContext, TestContextManager, alice_keypair, get_chat_msg, mark_as_verified,
|
||||
@@ -2724,6 +2725,45 @@ async fn test_read_receipts_dont_unmark_bots() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_self_mdn_vs_delayed_msg() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let alice = &tcm.alice().await;
|
||||
let bob1 = &tcm.bob().await;
|
||||
let bob2 = &tcm.bob().await;
|
||||
|
||||
let alice_chat = alice.create_chat(bob1).await;
|
||||
let sent1 = alice.send_text(alice_chat.id, "1").await;
|
||||
SystemTime::shift(Duration::from_secs(1));
|
||||
let sent2 = alice.send_text(alice_chat.id, "2").await;
|
||||
|
||||
let msg2_id = bob1.recv_msg(&sent2).await.id;
|
||||
let msg1_id = bob1.recv_msg(&sent1).await.id;
|
||||
let bob2_chat_id = bob2.recv_msg(&sent2).await.chat_id;
|
||||
bob2.recv_msg(&sent1).await;
|
||||
|
||||
message::markseen_msgs(bob1, vec![msg2_id]).await?;
|
||||
assert_eq!(msg1_id.get_state(bob1).await?, MessageState::InFresh);
|
||||
smtp::queue_mdn(bob1).await?;
|
||||
let sent = bob1.pop_sent_msg().await;
|
||||
|
||||
bob2.recv_msg_trash(&sent).await;
|
||||
let mut msgs = get_chat_msgs(bob2, bob2_chat_id).await?;
|
||||
let ChatItem::Message { msg_id } = msgs.pop().unwrap() else {
|
||||
unreachable!();
|
||||
};
|
||||
let msg = Message::load_from_db(bob2, msg_id).await?;
|
||||
assert_eq!(msg.text, "2");
|
||||
assert_eq!(msg.state, MessageState::InSeen);
|
||||
let ChatItem::Message { msg_id } = msgs.pop().unwrap() else {
|
||||
unreachable!();
|
||||
};
|
||||
let msg = Message::load_from_db(bob2, msg_id).await?;
|
||||
assert_eq!(msg.text, "1");
|
||||
assert_eq!(msg.state, MessageState::InFresh);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_gmx_forwarded_msg() -> Result<()> {
|
||||
let t = TestContext::new_alice().await;
|
||||
|
||||
Reference in New Issue
Block a user