mirror of
https://github.com/chatmail/core.git
synced 2026-04-26 18:06:35 +03:00
fix: Add tolerance to MemberListTimestamp (#5366)
Let's add a 1-minute tolerance to `Params::MemberListTimestamp`. This adds to the group membership consistency algo the following properties: - If remote group membership changes were made by two members in parallel, both of them are applied, no matter in which order the messages are received. - If we remove a member locally, only explicit remote member additions/removals made in parallel are allowed, but not the synchronisation of the member list from "To". Before, if somebody managed to reply earlier than receiving our removal of a member, we added it back which doesn't look good.
This commit is contained in:
45
src/chat.rs
45
src/chat.rs
@@ -1172,6 +1172,15 @@ impl ChatId {
|
||||
Ok(self.get_param(context).await?.exists(Param::Devicetalk))
|
||||
}
|
||||
|
||||
/// Returns chat member list timestamp.
|
||||
pub(crate) async fn get_member_list_timestamp(self, context: &Context) -> Result<i64> {
|
||||
Ok(self
|
||||
.get_param(context)
|
||||
.await?
|
||||
.get_i64(Param::MemberListTimestamp)
|
||||
.unwrap_or_default())
|
||||
}
|
||||
|
||||
async fn parent_query<T, F>(
|
||||
self,
|
||||
context: &Context,
|
||||
@@ -2804,15 +2813,21 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
|
||||
);
|
||||
}
|
||||
|
||||
let now = time();
|
||||
let now = smeared_time(context);
|
||||
|
||||
if rendered_msg.is_gossiped {
|
||||
msg.chat_id.set_gossiped_timestamp(context, now).await?;
|
||||
}
|
||||
|
||||
if rendered_msg.is_group {
|
||||
if msg.param.get_cmd() == SystemMessage::MemberRemovedFromGroup {
|
||||
// Reject member list synchronisation from older messages. See also
|
||||
// `receive_imf::apply_group_changes()`.
|
||||
msg.chat_id
|
||||
.update_timestamp(context, Param::MemberListTimestamp, now)
|
||||
.update_timestamp(
|
||||
context,
|
||||
Param::MemberListTimestamp,
|
||||
now.saturating_add(constants::TIMESTAMP_SENT_TOLERANCE),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
@@ -4777,9 +4792,9 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test simultaneous removal of user from the chat and leaving the group.
|
||||
/// Test parallel removal of user from the chat and leaving the group.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_simultaneous_member_remove() -> Result<()> {
|
||||
async fn test_parallel_member_remove() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
|
||||
let alice = tcm.alice().await;
|
||||
@@ -4810,20 +4825,25 @@ mod tests {
|
||||
add_contact_to_chat(&alice, alice_chat_id, alice_claire_contact_id).await?;
|
||||
let alice_sent_add_msg = alice.pop_sent_msg().await;
|
||||
|
||||
// Alice removes Bob from the chat.
|
||||
remove_contact_from_chat(&alice, alice_chat_id, alice_bob_contact_id).await?;
|
||||
let alice_sent_remove_msg = alice.pop_sent_msg().await;
|
||||
|
||||
// Bob leaves the chat.
|
||||
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
|
||||
|
||||
// Bob receives a msg about Alice adding Claire to the group.
|
||||
bob.recv_msg(&alice_sent_add_msg).await;
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
// This adds Bob because they left quite long ago.
|
||||
let alice_sent_msg = alice.send_text(alice_chat_id, "What a silence!").await;
|
||||
bob.recv_msg(&alice_sent_msg).await;
|
||||
|
||||
// Test that add message is rewritten.
|
||||
bob.golden_test_chat(bob_chat_id, "chat_test_simultaneous_member_remove")
|
||||
bob.golden_test_chat(bob_chat_id, "chat_test_parallel_member_remove")
|
||||
.await;
|
||||
|
||||
// Alice removes Bob from the chat.
|
||||
remove_contact_from_chat(&alice, alice_chat_id, alice_bob_contact_id).await?;
|
||||
let alice_sent_remove_msg = alice.pop_sent_msg().await;
|
||||
|
||||
// Bob receives a msg about Alice removing him from the group.
|
||||
let bob_received_remove_msg = bob.recv_msg(&alice_sent_remove_msg).await;
|
||||
|
||||
@@ -4860,8 +4880,13 @@ mod tests {
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
remove_contact_from_chat(&bob, bob_chat_id, bob_fiona_contact_id).await?;
|
||||
|
||||
// This doesn't add Fiona back because Bob just removed them.
|
||||
let sent_msg = alice.send_text(alice_chat_id, "Welcome, Fiona!").await;
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
let sent_msg = alice.send_text(alice_chat_id, "Welcome back, Fiona!").await;
|
||||
bob.recv_msg(&sent_msg).await;
|
||||
bob.golden_test_chat(bob_chat_id, "chat_test_msg_with_implicit_member_add")
|
||||
.await;
|
||||
Ok(())
|
||||
|
||||
@@ -219,6 +219,10 @@ pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
|
||||
/// How far the last quota check needs to be in the past to be checked by the background function (in seconds).
|
||||
pub(crate) const DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT: u64 = 12 * 60 * 60; // 12 hours
|
||||
|
||||
/// How far in the future the sender timestamp of a message is allowed to be, in seconds. Also used
|
||||
/// in the group membership consistency algo to reject outdated membership changes.
|
||||
pub(crate) const TIMESTAMP_SENT_TOLERANCE: i64 = 60;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use num_traits::FromPrimitive;
|
||||
|
||||
@@ -93,7 +93,6 @@ pub struct RenderedEmail {
|
||||
// pub envelope: Envelope,
|
||||
pub is_encrypted: bool,
|
||||
pub is_gossiped: bool,
|
||||
pub is_group: bool,
|
||||
pub last_added_location_id: Option<u32>,
|
||||
|
||||
/// A comma-separated string of sync-IDs that are used by the rendered email
|
||||
@@ -614,8 +613,6 @@ impl<'a> MimeFactory<'a> {
|
||||
));
|
||||
}
|
||||
|
||||
let mut is_group = false;
|
||||
|
||||
if let Loaded::Message { chat } = &self.loaded {
|
||||
if chat.typ == Chattype::Broadcast {
|
||||
let encoded_chat_name = encode_words(&chat.name);
|
||||
@@ -623,8 +620,6 @@ impl<'a> MimeFactory<'a> {
|
||||
"List-ID".into(),
|
||||
format!("{encoded_chat_name} <{}>", chat.grpid),
|
||||
));
|
||||
} else if chat.typ == Chattype::Group {
|
||||
is_group = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -896,7 +891,6 @@ impl<'a> MimeFactory<'a> {
|
||||
// envelope: Envelope::new,
|
||||
is_encrypted,
|
||||
is_gossiped,
|
||||
is_group,
|
||||
last_added_location_id,
|
||||
sync_ids_to_delete: self.sync_ids_to_delete,
|
||||
rfc724_mid,
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::aheader::{Aheader, EncryptPreference};
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chat::{add_info_msg, ChatId};
|
||||
use crate::config::Config;
|
||||
use crate::constants::{Chattype, DC_DESIRED_TEXT_LINES, DC_DESIRED_TEXT_LINE_LEN};
|
||||
use crate::constants::{self, Chattype, DC_DESIRED_TEXT_LINES, DC_DESIRED_TEXT_LINE_LEN};
|
||||
use crate::contact::{addr_cmp, addr_normalize, Contact, ContactId, Origin};
|
||||
use crate::context::Context;
|
||||
use crate::decrypt::{
|
||||
@@ -212,7 +212,9 @@ impl MimeMessage {
|
||||
.headers
|
||||
.get_header_value(HeaderDef::Date)
|
||||
.and_then(|v| mailparse::dateparse(&v).ok())
|
||||
.map_or(timestamp_rcvd, |value| min(value, timestamp_rcvd + 60));
|
||||
.map_or(timestamp_rcvd, |value| {
|
||||
min(value, timestamp_rcvd + constants::TIMESTAMP_SENT_TOLERANCE)
|
||||
});
|
||||
let mut hop_info = parse_receive_headers(&mail.get_headers());
|
||||
|
||||
let mut headers = Default::default();
|
||||
|
||||
@@ -11,7 +11,7 @@ use regex::Regex;
|
||||
use crate::aheader::EncryptPreference;
|
||||
use crate::chat::{self, Chat, ChatId, ChatIdBlocked, ProtectionStatus};
|
||||
use crate::config::Config;
|
||||
use crate::constants::{Blocked, Chattype, ShowEmails, DC_CHAT_ID_TRASH};
|
||||
use crate::constants::{self, Blocked, Chattype, ShowEmails, DC_CHAT_ID_TRASH};
|
||||
use crate::contact::{
|
||||
addr_cmp, may_be_valid_addr, normalize_name, Contact, ContactAddress, ContactId, Origin,
|
||||
};
|
||||
@@ -1919,18 +1919,24 @@ async fn apply_group_changes(
|
||||
HashSet::<ContactId>::from_iter(chat::get_chat_contacts(context, chat_id).await?);
|
||||
let is_from_in_chat =
|
||||
!chat_contacts.contains(&ContactId::SELF) || chat_contacts.contains(&from_id);
|
||||
|
||||
// Reject group membership changes from non-members and old changes.
|
||||
let allow_member_list_changes = !is_partial_download
|
||||
&& is_from_in_chat
|
||||
&& chat_id
|
||||
.update_timestamp(
|
||||
context,
|
||||
Param::MemberListTimestamp,
|
||||
mime_parser.timestamp_sent,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let member_list_ts = match !is_partial_download && is_from_in_chat {
|
||||
true => Some(chat_id.get_member_list_timestamp(context).await?),
|
||||
false => None,
|
||||
};
|
||||
// When we remove a member locally, we shift `MemberListTimestamp` by `TIMESTAMP_SENT_TOLERANCE`
|
||||
// into the future, so add some more tolerance here to allow remote membership changes as well.
|
||||
let timestamp_sent_tolerance = constants::TIMESTAMP_SENT_TOLERANCE * 2;
|
||||
let allow_member_list_changes = member_list_ts
|
||||
.filter(|t| {
|
||||
*t <= mime_parser
|
||||
.timestamp_sent
|
||||
.saturating_add(timestamp_sent_tolerance)
|
||||
})
|
||||
.is_some();
|
||||
let sync_member_list = member_list_ts
|
||||
.filter(|t| *t <= mime_parser.timestamp_sent)
|
||||
.is_some();
|
||||
// Whether to rebuild the member list from scratch.
|
||||
let recreate_member_list = {
|
||||
// Always recreate membership list if SELF has been added. The older versions of DC
|
||||
@@ -1945,15 +1951,16 @@ async fn apply_group_changes(
|
||||
.is_none(),
|
||||
None => false,
|
||||
}
|
||||
} && {
|
||||
if !allow_member_list_changes {
|
||||
} && (
|
||||
// Don't allow the timestamp tolerance here for more reliable leaving of groups.
|
||||
sync_member_list || {
|
||||
info!(
|
||||
context,
|
||||
"Ignoring a try to recreate member list of {chat_id} by {from_id}.",
|
||||
);
|
||||
false
|
||||
}
|
||||
allow_member_list_changes
|
||||
};
|
||||
);
|
||||
|
||||
if mime_parser.get_header(HeaderDef::ChatVerified).is_some() {
|
||||
if let VerifiedEncryption::NotVerified(err) = verified_encryption {
|
||||
@@ -2066,6 +2073,13 @@ async fn apply_group_changes(
|
||||
}
|
||||
|
||||
if !recreate_member_list {
|
||||
let mut diff = HashSet::<ContactId>::new();
|
||||
if sync_member_list {
|
||||
diff = new_members.difference(&chat_contacts).copied().collect();
|
||||
} else if let Some(added_id) = added_id {
|
||||
diff.insert(added_id);
|
||||
}
|
||||
new_members = chat_contacts.clone();
|
||||
// Don't delete any members locally, but instead add absent ones to provide group
|
||||
// membership consistency for all members:
|
||||
// - Classical MUA users usually don't intend to remove users from an email thread, so
|
||||
@@ -2077,9 +2091,6 @@ async fn apply_group_changes(
|
||||
// will likely recreate the member list from the next received message. The problem
|
||||
// occurs only if that "somebody" managed to reply earlier. Really, it's a problem for
|
||||
// big groups with high message rate, but let it be for now.
|
||||
let mut diff: HashSet<ContactId> =
|
||||
new_members.difference(&chat_contacts).copied().collect();
|
||||
new_members = chat_contacts.clone();
|
||||
new_members.extend(diff.clone());
|
||||
if let Some(added_id) = added_id {
|
||||
diff.remove(&added_id);
|
||||
@@ -2115,6 +2126,17 @@ async fn apply_group_changes(
|
||||
chat_contacts = new_members;
|
||||
send_event_chat_modified = true;
|
||||
}
|
||||
if sync_member_list {
|
||||
let mut ts = mime_parser.timestamp_sent;
|
||||
if recreate_member_list {
|
||||
// Reject all older membership changes. See `allow_member_list_changes` to know how
|
||||
// this works.
|
||||
ts += timestamp_sent_tolerance;
|
||||
}
|
||||
chat_id
|
||||
.update_timestamp(context, Param::MemberListTimestamp, ts)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(avatar_action) = &mime_parser.group_avatar {
|
||||
|
||||
@@ -3728,6 +3728,7 @@ async fn test_dont_recreate_contacts_on_add_remove() -> Result<()> {
|
||||
alice.recv_msg(&bob.pop_sent_msg().await).await;
|
||||
assert_eq!(get_chat_contacts(&alice, alice_chat_id).await?.len(), 3);
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
send_text_msg(
|
||||
&alice,
|
||||
alice_chat_id,
|
||||
@@ -3810,17 +3811,18 @@ async fn test_dont_readd_with_normal_msg() -> Result<()> {
|
||||
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
|
||||
assert_eq!(get_chat_contacts(&bob, bob_chat_id).await?.len(), 1);
|
||||
|
||||
SystemTime::shift(Duration::from_secs(3600));
|
||||
add_contact_to_chat(
|
||||
&alice,
|
||||
alice_chat_id,
|
||||
Contact::create(&alice, "fiora", "fiora@example.net").await?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
|
||||
// Alice didn't receive Bob's leave message, so Bob must readd themselves otherwise other
|
||||
// members would think Bob is still here while they aren't, and then retry to leave if they
|
||||
// Alice didn't receive Bob's leave message although a lot of time has
|
||||
// passed, so Bob must readd themselves otherwise other members would think
|
||||
// Bob is still here while they aren't. Bob should retry to leave if they
|
||||
// think that Alice didn't re-add them on purpose (which is possible if Alice uses a classical
|
||||
// MUA).
|
||||
assert!(is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
|
||||
@@ -4040,6 +4042,15 @@ async fn test_recreate_member_list_on_missing_add_of_self() -> Result<()> {
|
||||
// Bob missed the message adding them, but must recreate the member list.
|
||||
assert_eq!(get_chat_contacts(&bob, bob_chat_id).await?.len(), 2);
|
||||
assert!(is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
|
||||
|
||||
// But if Bob just left, they mustn't recreate the member list even after missing a message.
|
||||
bob_chat_id.accept(&bob).await?;
|
||||
remove_contact_from_chat(&bob, bob_chat_id, ContactId::SELF).await?;
|
||||
send_text_msg(&alice, alice_chat_id, "3rd message".to_string()).await?;
|
||||
alice.pop_sent_msg().await;
|
||||
send_text_msg(&alice, alice_chat_id, "4th message".to_string()).await?;
|
||||
bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||
assert!(!is_contact_in_chat(&bob, bob_chat_id, ContactId::SELF).await?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user