feat(webxdc): add unique IDs to status updates sent outside

This allows for deduplication
if status updates are sent over multiple transports.
This commit is contained in:
link2xt
2023-11-26 22:05:48 +00:00
parent 616faff96b
commit f91ba357cf
3 changed files with 132 additions and 37 deletions

View File

@@ -54,7 +54,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
match context
.write_status_update_inner(
&msg_id,
StatusUpdateItem {
&StatusUpdateItem {
payload: json!({
"event": event,
"time": time,
@@ -62,6 +62,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
info: None,
summary: None,
document: None,
uid: None,
},
)
.await
@@ -70,10 +71,15 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
eprintln!("Can't log event to webxdc status update: {err:#}");
}
Ok(serial) => {
context.emit_event(EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial: serial,
});
if let Some(serial) = serial {
context.emit_event(EventType::WebxdcStatusUpdate {
msg_id,
status_update_serial: serial,
});
} else {
// This should not happen as the update has no `uid`.
error!(context, "Debug logging update is not created.");
};
}
}
}

View File

@@ -763,6 +763,28 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
.await?;
}
if dbversion < 105 {
// Create UNIQUE uid column and drop unused update_item_read column.
sql.execute_migration(
r#"CREATE TABLE new_msgs_status_updates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
msg_id INTEGER,
update_item TEXT DEFAULT '',
uid TEXT UNIQUE
);
INSERT OR IGNORE INTO new_msgs_status_updates SELECT
id, msg_id, update_item, NULL
FROM msgs_status_updates;
DROP TABLE msgs_status_updates;
ALTER TABLE new_msgs_status_updates RENAME TO msgs_status_updates;
CREATE INDEX msgs_status_updates_index1 ON msgs_status_updates (msg_id);
CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
"#,
105,
)
.await?;
}
let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?

View File

@@ -5,6 +5,7 @@
//! - `id` - status update serial number
//! - `msg_id` - ID of the message in the `msgs` table
//! - `update_item` - JSON representation of the status update
//! - `uid` - "id" field of the update, used for deduplication
//!
//! Status updates are scheduled for sending by adding a record
//! to `smtp_status_updates_table` SQL table.
@@ -14,7 +15,6 @@
//! - `last_serial` - serial number of the last status update to send
//! - `descr` - text to send along with the updates
use std::convert::TryFrom;
use std::path::Path;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
@@ -37,6 +37,7 @@ use crate::mimefactory::wrapped_base64_encode;
use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::tools::create_id;
use crate::tools::strip_rtlo_characters;
use crate::tools::{create_smeared_timestamp, get_abs_path};
@@ -178,6 +179,13 @@ pub struct StatusUpdateItem {
/// for a voting app.
#[serde(skip_serializing_if = "Option::is_none")]
pub summary: Option<String>,
/// Unique ID for deduplication.
/// This can be used if the message is sent over multiple transports.
///
/// If there is no ID, message is always considered to be unique.
#[serde(skip_serializing_if = "Option::is_none")]
pub uid: Option<String>,
}
/// Update items as passed to the UIs.
@@ -317,7 +325,14 @@ impl Context {
timestamp: i64,
can_info_msg: bool,
from_id: ContactId,
) -> Result<StatusUpdateSerial> {
) -> Result<Option<StatusUpdateSerial>> {
let Some(status_update_serial) = self
.write_status_update_inner(&instance.id, &status_update_item)
.await?
else {
return Ok(None);
};
if can_info_msg {
if let Some(ref info) = status_update_item.info {
if let Some(info_msg_id) =
@@ -376,10 +391,6 @@ impl Context {
self.emit_msgs_changed(instance.chat_id, instance.id);
}
let status_update_serial = self
.write_status_update_inner(&instance.id, status_update_item)
.await?;
if instance.viewtype == Viewtype::Webxdc {
self.emit_event(EventType::WebxdcStatusUpdate {
msg_id: instance.id,
@@ -387,23 +398,42 @@ impl Context {
});
}
Ok(status_update_serial)
Ok(Some(status_update_serial))
}
/// Inserts a status update item into `msgs_status_updates` table.
///
/// Returns serial ID of the status update if a new item is inserted.
pub(crate) async fn write_status_update_inner(
&self,
instance_id: &MsgId,
status_update_item: StatusUpdateItem,
) -> Result<StatusUpdateSerial> {
let rowid = self
status_update_item: &StatusUpdateItem,
) -> Result<Option<StatusUpdateSerial>> {
let uid = status_update_item.uid.as_deref();
let Some(rowid) = self
.sql
.insert(
"INSERT INTO msgs_status_updates (msg_id, update_item) VALUES(?, ?);",
(instance_id, serde_json::to_string(&status_update_item)?),
.query_row_optional(
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
ON CONFLICT (uid) DO NOTHING
RETURNING id",
(
instance_id,
serde_json::to_string(&status_update_item)?,
uid,
),
|row| {
let id: u32 = row.get(0)?;
Ok(id)
},
)
.await?;
let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?);
Ok(status_update_serial)
.await?
else {
let uid = uid.unwrap_or("-");
info!(self, "Ignoring duplicate status update with uid={uid}");
return Ok(None);
};
let status_update_serial = StatusUpdateSerial(rowid);
Ok(Some(status_update_serial))
}
/// Returns the update_item with `status_update_serial` from the webxdc with message id `msg_id`.
@@ -449,7 +479,7 @@ impl Context {
pub async fn send_webxdc_status_update_struct(
&self,
instance_msg_id: MsgId,
status_update: StatusUpdateItem,
mut status_update: StatusUpdateItem,
descr: &str,
) -> Result<()> {
let mut instance = Message::load_from_db(self, instance_msg_id).await?;
@@ -467,6 +497,7 @@ impl Context {
MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft
);
status_update.uid = Some(create_id());
let status_update_serial: StatusUpdateSerial = self
.create_status_update_record(
&mut instance,
@@ -475,7 +506,8 @@ impl Context {
send_now,
ContactId::SELF,
)
.await?;
.await?
.context("Failed to create status update")?;
if send_now {
self.sql.insert(
@@ -655,7 +687,10 @@ impl Context {
let (update_item_str, serial) = row;
let update_item = StatusUpdateItemAndSerial
{
item: serde_json::from_str(&update_item_str)?,
item: StatusUpdateItem {
uid: None, // Erase UIDs, apps, bots and tests don't need to know them.
..serde_json::from_str(&update_item_str)?
},
serial,
max_serial,
};
@@ -1348,17 +1383,38 @@ mod tests {
info: None,
document: None,
summary: None,
uid: Some("iecie2Ze".to_string()),
},
1640178619,
true,
ContactId::SELF,
)
.await?
.unwrap();
assert_eq!(
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
// Update with duplicate update ID is received.
// Whatever the payload is, update should be ignored just because ID is duplicate.
let update_id1_duplicate = t
.create_status_update_record(
&mut instance,
StatusUpdateItem {
payload: json!({"nothing": "this should be ignored"}),
info: None,
document: None,
summary: None,
uid: Some("iecie2Ze".to_string()),
},
1640178619,
true,
ContactId::SELF,
)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
assert_eq!(update_id1_duplicate, None);
assert!(t
.send_webxdc_status_update(instance.id, "\n\n\n", "")
@@ -1384,15 +1440,17 @@ mod tests {
info: None,
document: None,
summary: None,
uid: None,
},
1640178619,
true,
ContactId::SELF,
)
.await?;
.await?
.unwrap();
assert_eq!(
t.get_webxdc_status_updates(instance.id, update_id1).await?,
r#"[{"payload":{"foo2":"bar2"},"serial":2,"max_serial":2}]"#
r#"[{"payload":{"foo2":"bar2"},"serial":3,"max_serial":3}]"#
);
t.create_status_update_record(
&mut instance,
@@ -1401,6 +1459,7 @@ mod tests {
info: None,
document: None,
summary: None,
uid: None,
},
1640178619,
true,
@@ -1410,9 +1469,9 @@ mod tests {
assert_eq!(
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":3},
{"payload":{"foo2":"bar2"},"serial":2,"max_serial":3},
{"payload":true,"serial":3,"max_serial":3}]"#
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":4},
{"payload":{"foo2":"bar2"},"serial":3,"max_serial":4},
{"payload":true,"serial":4,"max_serial":4}]"#
);
t.send_webxdc_status_update(
@@ -1423,8 +1482,8 @@ mod tests {
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, update_id2).await?,
r#"[{"payload":true,"serial":3,"max_serial":4},
{"payload":1,"serial":4,"max_serial":4}]"#
r#"[{"payload":true,"serial":4,"max_serial":5},
{"payload":1,"serial":5,"max_serial":5}]"#
);
Ok(())
@@ -1654,6 +1713,8 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_render_webxdc_status_update_object_range() -> Result<()> {
use regex::Regex;
let t = TestContext::new_alice().await;
let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?;
let instance = send_webxdc_instance(&t, chat_id).await?;
@@ -1672,7 +1733,13 @@ mod tests {
)
.await?
.unwrap();
assert_eq!(json, "{\"updates\":[{\"payload\":2},\n{\"payload\":3}]}");
let json = Regex::new(r#""uid":"[^"]*""#)
.unwrap()
.replace_all(&json, "XXX");
assert_eq!(
json,
"{\"updates\":[{\"payload\":2,XXX},\n{\"payload\":3,XXX}]}"
);
assert_eq!(
t.sql