diff --git a/src/debug_logging.rs b/src/debug_logging.rs index d9a67473e..cfee96b1c 100644 --- a/src/debug_logging.rs +++ b/src/debug_logging.rs @@ -54,7 +54,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver { - context.emit_event(EventType::WebxdcStatusUpdate { - msg_id, - status_update_serial: serial, - }); + if let Some(serial) = serial { + context.emit_event(EventType::WebxdcStatusUpdate { + msg_id, + status_update_serial: serial, + }); + } else { + // This should not happen as the update has no `uid`. + error!(context, "Debug logging update is not created."); + }; } } } diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 2b3ce6000..77dec99d8 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -763,6 +763,28 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid); .await?; } + if dbversion < 105 { + // Create UNIQUE uid column and drop unused update_item_read column. + sql.execute_migration( + r#"CREATE TABLE new_msgs_status_updates ( +id INTEGER PRIMARY KEY AUTOINCREMENT, +msg_id INTEGER, +update_item TEXT DEFAULT '', +uid TEXT UNIQUE +); +INSERT OR IGNORE INTO new_msgs_status_updates SELECT + id, msg_id, update_item, NULL +FROM msgs_status_updates; +DROP TABLE msgs_status_updates; +ALTER TABLE new_msgs_status_updates RENAME TO msgs_status_updates; +CREATE INDEX msgs_status_updates_index1 ON msgs_status_updates (msg_id); +CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); +"#, + 105, + ) + .await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? diff --git a/src/webxdc.rs b/src/webxdc.rs index bb6b1b466..06c518926 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -5,6 +5,7 @@ //! - `id` - status update serial number //! - `msg_id` - ID of the message in the `msgs` table //! - `update_item` - JSON representation of the status update +//! - `uid` - "id" field of the update, used for deduplication //! //! Status updates are scheduled for sending by adding a record //! to `smtp_status_updates_table` SQL table. @@ -14,7 +15,6 @@ //! - `last_serial` - serial number of the last status update to send //! - `descr` - text to send along with the updates -use std::convert::TryFrom; use std::path::Path; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; @@ -37,6 +37,7 @@ use crate::mimefactory::wrapped_base64_encode; use crate::mimeparser::SystemMessage; use crate::param::Param; use crate::param::Params; +use crate::tools::create_id; use crate::tools::strip_rtlo_characters; use crate::tools::{create_smeared_timestamp, get_abs_path}; @@ -178,6 +179,13 @@ pub struct StatusUpdateItem { /// for a voting app. #[serde(skip_serializing_if = "Option::is_none")] pub summary: Option, + + /// Unique ID for deduplication. + /// This can be used if the message is sent over multiple transports. + /// + /// If there is no ID, message is always considered to be unique. + #[serde(skip_serializing_if = "Option::is_none")] + pub uid: Option, } /// Update items as passed to the UIs. @@ -317,7 +325,14 @@ impl Context { timestamp: i64, can_info_msg: bool, from_id: ContactId, - ) -> Result { + ) -> Result> { + let Some(status_update_serial) = self + .write_status_update_inner(&instance.id, &status_update_item) + .await? + else { + return Ok(None); + }; + if can_info_msg { if let Some(ref info) = status_update_item.info { if let Some(info_msg_id) = @@ -376,10 +391,6 @@ impl Context { self.emit_msgs_changed(instance.chat_id, instance.id); } - let status_update_serial = self - .write_status_update_inner(&instance.id, status_update_item) - .await?; - if instance.viewtype == Viewtype::Webxdc { self.emit_event(EventType::WebxdcStatusUpdate { msg_id: instance.id, @@ -387,23 +398,42 @@ impl Context { }); } - Ok(status_update_serial) + Ok(Some(status_update_serial)) } + /// Inserts a status update item into `msgs_status_updates` table. + /// + /// Returns serial ID of the status update if a new item is inserted. pub(crate) async fn write_status_update_inner( &self, instance_id: &MsgId, - status_update_item: StatusUpdateItem, - ) -> Result { - let rowid = self + status_update_item: &StatusUpdateItem, + ) -> Result> { + let uid = status_update_item.uid.as_deref(); + let Some(rowid) = self .sql - .insert( - "INSERT INTO msgs_status_updates (msg_id, update_item) VALUES(?, ?);", - (instance_id, serde_json::to_string(&status_update_item)?), + .query_row_optional( + "INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?) + ON CONFLICT (uid) DO NOTHING + RETURNING id", + ( + instance_id, + serde_json::to_string(&status_update_item)?, + uid, + ), + |row| { + let id: u32 = row.get(0)?; + Ok(id) + }, ) - .await?; - let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?); - Ok(status_update_serial) + .await? + else { + let uid = uid.unwrap_or("-"); + info!(self, "Ignoring duplicate status update with uid={uid}"); + return Ok(None); + }; + let status_update_serial = StatusUpdateSerial(rowid); + Ok(Some(status_update_serial)) } /// Returns the update_item with `status_update_serial` from the webxdc with message id `msg_id`. @@ -449,7 +479,7 @@ impl Context { pub async fn send_webxdc_status_update_struct( &self, instance_msg_id: MsgId, - status_update: StatusUpdateItem, + mut status_update: StatusUpdateItem, descr: &str, ) -> Result<()> { let mut instance = Message::load_from_db(self, instance_msg_id).await?; @@ -467,6 +497,7 @@ impl Context { MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft ); + status_update.uid = Some(create_id()); let status_update_serial: StatusUpdateSerial = self .create_status_update_record( &mut instance, @@ -475,7 +506,8 @@ impl Context { send_now, ContactId::SELF, ) - .await?; + .await? + .context("Failed to create status update")?; if send_now { self.sql.insert( @@ -655,7 +687,10 @@ impl Context { let (update_item_str, serial) = row; let update_item = StatusUpdateItemAndSerial { - item: serde_json::from_str(&update_item_str)?, + item: StatusUpdateItem { + uid: None, // Erase UIDs, apps, bots and tests don't need to know them. + ..serde_json::from_str(&update_item_str)? + }, serial, max_serial, }; @@ -1348,17 +1383,38 @@ mod tests { info: None, document: None, summary: None, + uid: Some("iecie2Ze".to_string()), + }, + 1640178619, + true, + ContactId::SELF, + ) + .await? + .unwrap(); + assert_eq!( + t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0)) + .await?, + r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"# + ); + + // Update with duplicate update ID is received. + // Whatever the payload is, update should be ignored just because ID is duplicate. + let update_id1_duplicate = t + .create_status_update_record( + &mut instance, + StatusUpdateItem { + payload: json!({"nothing": "this should be ignored"}), + info: None, + document: None, + summary: None, + uid: Some("iecie2Ze".to_string()), }, 1640178619, true, ContactId::SELF, ) .await?; - assert_eq!( - t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0)) - .await?, - r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"# - ); + assert_eq!(update_id1_duplicate, None); assert!(t .send_webxdc_status_update(instance.id, "\n\n\n", "") @@ -1384,15 +1440,17 @@ mod tests { info: None, document: None, summary: None, + uid: None, }, 1640178619, true, ContactId::SELF, ) - .await?; + .await? + .unwrap(); assert_eq!( t.get_webxdc_status_updates(instance.id, update_id1).await?, - r#"[{"payload":{"foo2":"bar2"},"serial":2,"max_serial":2}]"# + r#"[{"payload":{"foo2":"bar2"},"serial":3,"max_serial":3}]"# ); t.create_status_update_record( &mut instance, @@ -1401,6 +1459,7 @@ mod tests { info: None, document: None, summary: None, + uid: None, }, 1640178619, true, @@ -1410,9 +1469,9 @@ mod tests { assert_eq!( t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0)) .await?, - r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":3}, -{"payload":{"foo2":"bar2"},"serial":2,"max_serial":3}, -{"payload":true,"serial":3,"max_serial":3}]"# + r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":4}, +{"payload":{"foo2":"bar2"},"serial":3,"max_serial":4}, +{"payload":true,"serial":4,"max_serial":4}]"# ); t.send_webxdc_status_update( @@ -1423,8 +1482,8 @@ mod tests { .await?; assert_eq!( t.get_webxdc_status_updates(instance.id, update_id2).await?, - r#"[{"payload":true,"serial":3,"max_serial":4}, -{"payload":1,"serial":4,"max_serial":4}]"# + r#"[{"payload":true,"serial":4,"max_serial":5}, +{"payload":1,"serial":5,"max_serial":5}]"# ); Ok(()) @@ -1654,6 +1713,8 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_render_webxdc_status_update_object_range() -> Result<()> { + use regex::Regex; + let t = TestContext::new_alice().await; let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?; let instance = send_webxdc_instance(&t, chat_id).await?; @@ -1672,7 +1733,13 @@ mod tests { ) .await? .unwrap(); - assert_eq!(json, "{\"updates\":[{\"payload\":2},\n{\"payload\":3}]}"); + let json = Regex::new(r#""uid":"[^"]*""#) + .unwrap() + .replace_all(&json, "XXX"); + assert_eq!( + json, + "{\"updates\":[{\"payload\":2,XXX},\n{\"payload\":3,XXX}]}" + ); assert_eq!( t.sql