feat: Sync user actions for ad-hoc groups across devices (#5065)

Ad-hoc groups don't have grpid-s that can be used to identify them across devices and thus wasn't
synced until now.

The same problem already exists for assigning messages to ad-hoc groups and this assignment is done
by `get_parent_message()` and `lookup_chat_by_reply()`. Let's reuse this logic for the
synchronisation, it works well enough and this way we have less surprises than if we try to
implement grpids for ad-hoc groups. I.e. add an `Msgids` variant to `chat::SyncId` analogous to the
"References" header in messages and put two following Message-IDs to a sync message:
- The latest message A having `DownloadState::Done` and the state to be one of `InFresh, InNoticed,
  InSeen, OutDelivered, OutMdnRcvd`.
- The message that A references in `In-Reply-To`.

This way the logic is almost the same to what we have in `Chat::prepare_msg_raw()` (the difference
is that we don't use the oldest Message-ID) and it's easier to reuse the existing code.

NOTE: If a chat has only an OutPending message f.e., the synchronisation wouldn't work, but trying
to work in such a corner case has no significant value and isn't worth complicating the code.
This commit is contained in:
iequidoo
2023-12-02 15:22:40 -03:00
committed by iequidoo
parent 32071297e6
commit f279b0d1e5
3 changed files with 164 additions and 79 deletions

View File

@@ -209,6 +209,30 @@ impl ChatId {
self == DC_CHAT_ID_ALLDONE_HINT self == DC_CHAT_ID_ALLDONE_HINT
} }
/// Returns [`ChatId`] of a chat that `msg` belongs to.
///
/// Checks that `msg` is assigned to the right chat.
pub(crate) fn lookup_by_message(msg: &Message) -> Option<Self> {
if msg.chat_id == DC_CHAT_ID_TRASH {
return None;
}
if msg.download_state != DownloadState::Done
// TODO (2023-09-12): Added for backward compatibility with versions that did not have
// `DownloadState::Undecipherable`. Remove eventually with the comment in
// `MimeMessage::from_bytes()`.
|| msg
.error
.as_ref()
.filter(|e| e.starts_with("Decrypting failed:"))
.is_some()
{
// If `msg` is not fully downloaded or undecipherable, it may have been assigned to the
// wrong chat (they often get assigned to the 1:1 chat with the sender).
return None;
}
Some(msg.chat_id)
}
/// Returns the [`ChatId`] for the 1:1 chat with `contact_id` /// Returns the [`ChatId`] for the 1:1 chat with `contact_id`
/// if it exists and is not blocked. /// if it exists and is not blocked.
/// ///
@@ -374,6 +398,7 @@ impl ChatId {
pub(crate) async fn block_ex(self, context: &Context, sync: sync::Sync) -> Result<()> { pub(crate) async fn block_ex(self, context: &Context, sync: sync::Sync) -> Result<()> {
let chat = Chat::load_from_db(context, self).await?; let chat = Chat::load_from_db(context, self).await?;
let mut delete = false;
match chat.typ { match chat.typ {
Chattype::Broadcast => { Chattype::Broadcast => {
@@ -392,7 +417,7 @@ impl ChatId {
} }
Chattype::Group => { Chattype::Group => {
info!(context, "Can't block groups yet, deleting the chat."); info!(context, "Can't block groups yet, deleting the chat.");
self.delete(context).await?; delete = true;
} }
Chattype::Mailinglist => { Chattype::Mailinglist => {
if self.set_blocked(context, Blocked::Yes).await? { if self.set_blocked(context, Blocked::Yes).await? {
@@ -408,6 +433,9 @@ impl ChatId {
.log_err(context) .log_err(context)
.ok(); .ok();
} }
if delete {
self.delete(context).await?;
}
Ok(()) Ok(())
} }
@@ -1124,47 +1152,46 @@ impl ChatId {
Ok(self.get_param(context).await?.exists(Param::Devicetalk)) Ok(self.get_param(context).await?.exists(Param::Devicetalk))
} }
async fn parent_query<T, F>(self, context: &Context, fields: &str, f: F) -> Result<Option<T>> async fn parent_query<T, F>(
self,
context: &Context,
fields: &str,
state_out_min: MessageState,
f: F,
) -> Result<Option<T>>
where where
F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>, F: Send + FnOnce(&rusqlite::Row) -> rusqlite::Result<T>,
T: Send + 'static, T: Send + 'static,
{ {
let sql = &context.sql; let sql = &context.sql;
// Do not reply to not fully downloaded messages. Such a message could be a group chat
// message that we assigned to 1:1 chat.
let query = format!( let query = format!(
"SELECT {fields} \ "SELECT {fields} \
FROM msgs WHERE chat_id=? AND state NOT IN (?, ?) AND NOT hidden AND download_state={} \ FROM msgs \
WHERE chat_id=? \
AND ((state BETWEEN {} AND {}) OR (state >= {})) \
AND NOT hidden \
AND download_state={} \
ORDER BY timestamp DESC, id DESC \ ORDER BY timestamp DESC, id DESC \
LIMIT 1;", LIMIT 1;",
MessageState::InFresh as u32,
MessageState::InSeen as u32,
state_out_min as u32,
// Do not reply to not fully downloaded messages. Such a message could be a group chat
// message that we assigned to 1:1 chat.
DownloadState::Done as u32, DownloadState::Done as u32,
); );
let row = sql sql.query_row_optional(&query, (self,), f).await
.query_row_optional(
&query,
(
self,
MessageState::OutPreparing,
MessageState::OutDraft,
// We don't filter `OutPending` and `OutFailed` messages because the new message
// for which `parent_query()` is done may assume that it will be received in a
// context affected by those messages, e.g. they could add new members to a
// group and the new message will contain them in "To:". Anyway recipients must
// be prepared to orphaned references.
),
f,
)
.await?;
Ok(row)
} }
async fn get_parent_mime_headers( async fn get_parent_mime_headers(
self, self,
context: &Context, context: &Context,
state_out_min: MessageState,
) -> Result<Option<(String, String, String)>> { ) -> Result<Option<(String, String, String)>> {
self.parent_query( self.parent_query(
context, context,
"rfc724_mid, mime_in_reply_to, mime_references", "rfc724_mid, mime_in_reply_to, mime_references",
state_out_min,
|row: &rusqlite::Row| { |row: &rusqlite::Row| {
let rfc724_mid: String = row.get(0)?; let rfc724_mid: String = row.get(0)?;
let mime_in_reply_to: String = row.get(1)?; let mime_in_reply_to: String = row.get(1)?;
@@ -1741,7 +1768,15 @@ impl Chat {
// we do not set In-Reply-To/References in this case. // we do not set In-Reply-To/References in this case.
if !self.is_self_talk() { if !self.is_self_talk() {
if let Some((parent_rfc724_mid, parent_in_reply_to, parent_references)) = if let Some((parent_rfc724_mid, parent_in_reply_to, parent_references)) =
self.id.get_parent_mime_headers(context).await? // We don't filter `OutPending` and `OutFailed` messages because the new message for
// which `parent_query()` is done may assume that it will be received in a context
// affected by those messages, e.g. they could add new members to a group and the
// new message will contain them in "To:". Anyway recipients must be prepared to
// orphaned references.
self
.id
.get_parent_mime_headers(context, MessageState::OutPending)
.await?
{ {
// "In-Reply-To:" is not changed if it is set manually. // "In-Reply-To:" is not changed if it is set manually.
// This does not affect "References:" header, it will contain "default parent" (the // This does not affect "References:" header, it will contain "default parent" (the
@@ -1959,10 +1994,26 @@ impl Chat {
Ok(r) Ok(r)
} }
Chattype::Broadcast | Chattype::Group | Chattype::Mailinglist => { Chattype::Broadcast | Chattype::Group | Chattype::Mailinglist => {
if self.grpid.is_empty() { if !self.grpid.is_empty() {
return Ok(None); return Ok(Some(SyncId::Grpid(self.grpid.clone())));
} }
Ok(Some(SyncId::Grpid(self.grpid.clone())))
let Some((parent_rfc724_mid, parent_in_reply_to, _)) = self
.id
.get_parent_mime_headers(context, MessageState::OutDelivered)
.await?
else {
warn!(
context,
"Chat::get_sync_id({}): No good message identifying the chat found.",
self.id
);
return Ok(None);
};
Ok(Some(SyncId::Msgids(vec![
parent_in_reply_to,
parent_rfc724_mid,
])))
} }
} }
} }
@@ -4242,8 +4293,8 @@ async fn set_contacts_by_addrs(context: &Context, id: ChatId, addrs: &[String])
pub(crate) enum SyncId { pub(crate) enum SyncId {
ContactAddr(String), ContactAddr(String),
Grpid(String), Grpid(String),
// NOTE: Ad-hoc groups lack an identifier that can be used across devices so /// "Message-ID"-s, from oldest to latest. Used for ad-hoc groups.
// block/mute/etc. actions on them are not synchronized to other devices. Msgids(Vec<String>),
} }
/// An action synchronised to other devices. /// An action synchronised to other devices.
@@ -4266,12 +4317,9 @@ impl Context {
pub(crate) async fn sync_alter_chat(&self, id: &SyncId, action: &SyncAction) -> Result<()> { pub(crate) async fn sync_alter_chat(&self, id: &SyncId, action: &SyncAction) -> Result<()> {
let chat_id = match id { let chat_id = match id {
SyncId::ContactAddr(addr) => { SyncId::ContactAddr(addr) => {
let Some(contact_id) = let contact_id = Contact::lookup_id_by_addr_ex(self, addr, Origin::Unknown, None)
Contact::lookup_id_by_addr_ex(self, addr, Origin::Unknown, None).await? .await?
else { .with_context(|| format!("No contact for addr '{addr}'"))?;
warn!(self, "sync_alter_chat: No contact for addr '{addr}'.");
return Ok(());
};
match action { match action {
SyncAction::Block => { SyncAction::Block => {
return contact::set_blocked(self, Nosync, contact_id, true).await return contact::set_blocked(self, Nosync, contact_id, true).await
@@ -4281,22 +4329,26 @@ impl Context {
} }
_ => (), _ => (),
} }
let Some(chat_id) = ChatId::lookup_by_contact(self, contact_id).await? else { ChatId::lookup_by_contact(self, contact_id)
warn!(self, "sync_alter_chat: No chat for addr '{addr}'."); .await?
return Ok(()); .with_context(|| format!("No chat for addr '{addr}'"))?
};
chat_id
} }
SyncId::Grpid(grpid) => { SyncId::Grpid(grpid) => {
if let SyncAction::CreateBroadcast(name) = action { if let SyncAction::CreateBroadcast(name) = action {
create_broadcast_list_ex(self, Nosync, grpid.clone(), name.clone()).await?; create_broadcast_list_ex(self, Nosync, grpid.clone(), name.clone()).await?;
return Ok(()); return Ok(());
} }
let Some((chat_id, ..)) = get_chat_id_by_grpid(self, grpid).await? else { get_chat_id_by_grpid(self, grpid)
warn!(self, "sync_alter_chat: No chat for grpid '{grpid}'."); .await?
return Ok(()); .with_context(|| format!("No chat for grpid '{grpid}'"))?
}; .0
chat_id }
SyncId::Msgids(msgids) => {
let msg = message::get_latest_by_rfc724_mids(self, msgids)
.await?
.with_context(|| format!("No message found for Message-IDs {msgids:?}"))?;
ChatId::lookup_by_message(&msg)
.with_context(|| format!("No chat found for Message-IDs {msgids:?}"))?
} }
}; };
match action { match action {
@@ -6941,6 +6993,51 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_sync_adhoc_grp() -> Result<()> {
let alice0 = &TestContext::new_alice().await;
let alice1 = &TestContext::new_alice().await;
for a in [alice0, alice1] {
a.set_config_bool(Config::SyncMsgs, true).await?;
}
let mut chat_ids = Vec::new();
for a in [alice0, alice1] {
let msg = receive_imf(
a,
b"Subject: =?utf-8?q?Message_from_alice=40example=2Eorg?=\r\n\
From: alice@example.org\r\n\
To: <bob@example.net>, <fiona@example.org> \r\n\
Date: Mon, 2 Dec 2023 16:59:39 +0000\r\n\
Message-ID: <Mr.alices_original_mail@example.org>\r\n\
Chat-Version: 1.0\r\n\
\r\n\
hi\r\n",
false,
)
.await?
.unwrap();
chat_ids.push(msg.chat_id);
}
let chat1 = Chat::load_from_db(alice1, chat_ids[1]).await?;
assert_eq!(chat1.typ, Chattype::Group);
assert!(chat1.grpid.is_empty());
// Test synchronisation on chat blocking because it causes chat deletion currently and thus
// requires generating a sync message in advance.
chat_ids[0].block(alice0).await?;
sync(alice0, alice1).await;
assert!(Chat::load_from_db(alice1, chat_ids[1]).await.is_err());
assert!(
!alice1
.sql
.exists("SELECT COUNT(*) FROM chats WHERE id=?", (chat_ids[1],))
.await?
);
Ok(())
}
/// Tests syncing of chat visibility on a self-chat. This way we test: /// Tests syncing of chat visibility on a self-chat. This way we test:
/// - Self-chat synchronisation. /// - Self-chat synchronisation.
/// - That sync messages don't unarchive the self-chat. /// - That sync messages don't unarchive the self-chat.

View File

@@ -1842,6 +1842,24 @@ pub(crate) async fn rfc724_mid_exists_and(
Ok(res) Ok(res)
} }
/// Given a list of Message-IDs, returns the latest message found in the database.
///
/// Only messages that are not in the trash chat are considered.
pub(crate) async fn get_latest_by_rfc724_mids(
context: &Context,
mids: &[String],
) -> Result<Option<Message>> {
for id in mids.iter().rev() {
if let Some(msg_id) = rfc724_mid_exists(context, id).await? {
let msg = Message::load_from_db(context, msg_id).await?;
if msg.chat_id != DC_CHAT_ID_TRASH {
return Ok(Some(msg));
}
}
}
Ok(None)
}
/// How a message is primarily displayed. /// How a message is primarily displayed.
#[derive( #[derive(
Debug, Debug,

View File

@@ -1544,27 +1544,10 @@ async fn lookup_chat_by_reply(
let Some(parent) = parent else { let Some(parent) = parent else {
return Ok(None); return Ok(None);
}; };
let Some(parent_chat_id) = ChatId::lookup_by_message(parent) else {
let parent_chat = Chat::load_from_db(context, parent.chat_id).await?;
if parent.download_state != DownloadState::Done
// TODO (2023-09-12): Added for backward compatibility with versions that did not have
// `DownloadState::Undecipherable`. Remove eventually with the comment in
// `MimeMessage::from_bytes()`.
|| parent
.error
.as_ref()
.filter(|e| e.starts_with("Decrypting failed:"))
.is_some()
{
// If the parent msg is not fully downloaded or undecipherable, it may have been
// assigned to the wrong chat (they often get assigned to the 1:1 chat with the sender).
return Ok(None); return Ok(None);
} };
let parent_chat = Chat::load_from_db(context, parent_chat_id).await?;
if parent_chat.id == DC_CHAT_ID_TRASH {
return Ok(None);
}
// If this was a private message just to self, it was probably a private reply. // If this was a private message just to self, it was probably a private reply.
// It should not go into the group then, but into the private chat. // It should not go into the group then, but into the private chat.
@@ -2558,20 +2541,7 @@ async fn get_previous_message(
/// ///
/// Only messages that are not in the trash chat are considered. /// Only messages that are not in the trash chat are considered.
async fn get_rfc724_mid_in_list(context: &Context, mid_list: &str) -> Result<Option<Message>> { async fn get_rfc724_mid_in_list(context: &Context, mid_list: &str) -> Result<Option<Message>> {
if mid_list.is_empty() { message::get_latest_by_rfc724_mids(context, &parse_message_ids(mid_list)).await
return Ok(None);
}
for id in parse_message_ids(mid_list).iter().rev() {
if let Some(msg_id) = rfc724_mid_exists(context, id).await? {
let msg = Message::load_from_db(context, msg_id).await?;
if msg.chat_id != DC_CHAT_ID_TRASH {
return Ok(Some(msg));
}
}
}
Ok(None)
} }
/// Returns the last message referenced from References: header found in the database. /// Returns the last message referenced from References: header found in the database.