remove getAllUpdates() and add a typical replicatio API for the update call (#3081)

* (r10s, adb, hpk) remove getAllUpdates() and add a typical replica-API that works with increasing serials.  Streamline docs a bit.

* adapt ffi to new api

* documentation: updates serials may have gaps

* get_webxdc_status_updates() return updates larger than a given serial

* remove status_update_id from status-update-event; it is not needed (ui should update from the last known serial) and easily gets confused with last_serial

* unify wording to 'StatusUpdateSerial'

* remove legacy payload format, all known webxdc should be adapted

* add serial and max_serial to status updates

* avoid races when getting max_serial by avoiding two SQL calls

* update changelog

Co-authored-by: B. Petersen <r10s@b44t.com>
This commit is contained in:
holger krekel
2022-03-04 20:22:48 +01:00
committed by GitHub
parent ef841b1aa3
commit 63688a2f95
6 changed files with 219 additions and 179 deletions

View File

@@ -10,6 +10,7 @@ use crate::param::Param;
use crate::{chat, EventType};
use anyhow::{bail, ensure, format_err, Result};
use async_std::path::PathBuf;
use deltachat_derive::FromSql;
use lettre_email::mime::{self};
use lettre_email::PartBuilder;
use serde::{Deserialize, Serialize};
@@ -56,14 +57,26 @@ pub struct WebxdcInfo {
/// Status Update ID.
#[derive(
Debug, Copy, Clone, Default, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
Debug,
Copy,
Clone,
Default,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
Serialize,
Deserialize,
FromSql,
FromPrimitive,
)]
pub struct StatusUpdateId(u32);
pub struct StatusUpdateSerial(u32);
impl StatusUpdateId {
impl StatusUpdateSerial {
/// Create a new [MsgId].
pub fn new(id: u32) -> StatusUpdateId {
StatusUpdateId(id)
pub fn new(id: u32) -> StatusUpdateSerial {
StatusUpdateSerial(id)
}
/// Gets StatusUpdateId as untyped integer.
@@ -73,7 +86,7 @@ impl StatusUpdateId {
}
}
impl rusqlite::types::ToSql for StatusUpdateId {
impl rusqlite::types::ToSql for StatusUpdateSerial {
fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput> {
let val = rusqlite::types::Value::Integer(i64::from(self.0));
let out = rusqlite::types::ToSqlOutput::Owned(val);
@@ -99,6 +112,16 @@ pub(crate) struct StatusUpdateItem {
summary: Option<String>,
}
/// Update items as passed to the UIs.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct StatusUpdateItemAndSerial {
#[serde(flatten)]
item: StatusUpdateItem,
serial: StatusUpdateSerial,
max_serial: StatusUpdateSerial,
}
impl Context {
/// check if a file is an acceptable webxdc for sending or receiving.
pub(crate) async fn is_webxdc_file(&self, filename: &str, buf: &[u8]) -> Result<bool> {
@@ -157,7 +180,7 @@ impl Context {
instance: &mut Message,
update_str: &str,
timestamp: i64,
) -> Result<StatusUpdateId> {
) -> Result<StatusUpdateSerial> {
let update_str = update_str.trim();
if update_str.is_empty() {
bail!("create_status_update_record: empty update.");
@@ -176,13 +199,7 @@ impl Context {
_ => item,
}
} else {
// TODO: this fallback (legacy `PAYLOAD`) should be deleted soon, together with the test below
let payload: Value = serde_json::from_str(update_str)?; // checks if input data are valid json
StatusUpdateItem {
payload,
info: None,
summary: None,
}
bail!("create_status_update_record: no valid update item.");
}
};
@@ -219,14 +236,10 @@ impl Context {
paramsv![instance.id, serde_json::to_string(&status_update_item)?],
)
.await?;
let status_update_id = StatusUpdateId(u32::try_from(rowid)?);
self.emit_event(EventType::WebxdcStatusUpdate {
msg_id: instance.id,
status_update_id,
});
self.emit_event(EventType::WebxdcStatusUpdate(instance.id));
Ok(status_update_id)
Ok(StatusUpdateSerial(u32::try_from(rowid)?))
}
/// Sends a status update for an webxdc instance.
@@ -250,7 +263,7 @@ impl Context {
let chat = Chat::load_from_db(self, instance.chat_id).await?;
ensure!(chat.can_send(self).await?, "cannot send to {}", chat.id);
let status_update_id = self
let status_update_serial = self
.create_status_update_record(
&mut instance,
update_str,
@@ -279,7 +292,7 @@ impl Context {
Param::Arg,
self.render_webxdc_status_update_object(
instance_msg_id,
Some(status_update_id),
Some(status_update_serial),
)
.await?
.ok_or_else(|| format_err!("Status object expected."))?,
@@ -338,24 +351,79 @@ impl Context {
Ok(())
}
/// Returns status updates as an JSON-array.
/// Returns status updates as an JSON-array, ready to be consumed by a webxdc.
///
/// Example: `[{"payload":"any update data"},{"payload":"another update data"}]`
/// The updates may be filtered by a given status_update_id;
/// if no updates are available, an empty JSON-array is returned.
/// Example: `[{"serial":1, "max_serial":3, "payload":"any update data"},
/// {"serial":3, "max_serial":3, "payload":"another update data"}]`
/// Updates with serials larger than `last_known_serial` are returned.
/// If no last serial is known, set `last_known_serial` to 0.
/// If no updates are available, an empty JSON-array is returned.
pub async fn get_webxdc_status_updates(
&self,
instance_msg_id: MsgId,
status_update_id: Option<StatusUpdateId>,
last_known_serial: StatusUpdateSerial,
) -> Result<String> {
let json = self
.sql
.query_map(
"SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND (1=? OR id=?)",
"SELECT update_item, id FROM msgs_status_updates WHERE msg_id=? AND id>? ORDER BY id",
paramsv![instance_msg_id, last_known_serial],
|row| {
let update_item_str = row.get::<_, String>(0)?;
let serial = row.get::<_, StatusUpdateSerial>(1)?;
Ok((update_item_str, serial))
},
|rows| {
let mut rows_copy : Vec<(String, StatusUpdateSerial)> = Vec::new(); // `rows_copy` needed as `rows` cannot be iterated twice.
let mut max_serial = StatusUpdateSerial(0);
for row in rows {
let row = row?;
if row.1 > max_serial {
max_serial = row.1;
}
rows_copy.push(row);
}
let mut json = String::default();
for row in rows_copy {
let (update_item_str, serial) = row;
let update_item = StatusUpdateItemAndSerial
{
item: serde_json::from_str(&*update_item_str)?,
serial,
max_serial,
};
if !json.is_empty() {
json.push_str(",\n");
}
json.push_str(&*serde_json::to_string(&update_item)?);
}
Ok(json)
},
)
.await?;
Ok(format!("[{}]", json))
}
/// Renders JSON-object for status updates as used on the wire.
///
/// Example: `{"updates": [{"payload":"any update data"},
/// {"payload":"another update data"}]}`
/// If `status_update_serial` is set, exactly that update is rendered, otherwise all updates are rendered.
pub(crate) async fn render_webxdc_status_update_object(
&self,
instance_msg_id: MsgId,
status_update_serial: Option<StatusUpdateSerial>,
) -> Result<Option<String>> {
let json = self
.sql
.query_map(
"SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND (1=? OR id=?) ORDER BY id",
paramsv![
instance_msg_id,
if status_update_id.is_some() { 0 } else { 1 },
status_update_id.unwrap_or(StatusUpdateId(0))
if status_update_serial.is_some() { 0 } else { 1 },
status_update_serial.unwrap_or(StatusUpdateSerial(0))
],
|row| row.get::<_, String>(0),
|rows| {
@@ -371,22 +439,10 @@ impl Context {
},
)
.await?;
Ok(format!("[{}]", json))
}
/// Render JSON-object for status updates as used on the wire.
pub(crate) async fn render_webxdc_status_update_object(
&self,
instance_msg_id: MsgId,
status_update_id: Option<StatusUpdateId>,
) -> Result<Option<String>> {
let updates_array = self
.get_webxdc_status_updates(instance_msg_id, status_update_id)
.await?;
if updates_array == "[]" {
if json.is_empty() {
Ok(None)
} else {
Ok(Some(format!(r#"{{"updates":{}}}"#, updates_array)))
Ok(Some(format!(r#"{{"updates":[{}]}}"#, json)))
}
}
}
@@ -635,8 +691,9 @@ mod tests {
.await?;
assert!(!instance.is_forwarded());
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":42,"info":"foo","summary":"bar"}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":42,"info":"foo","summary":"bar","serial":1,"max_serial":1}]"#
);
assert_eq!(chat_id.get_msg_cnt(&t).await?, 2); // instance and info
let info = Message::load_from_db(&t, instance.id)
@@ -649,7 +706,11 @@ mod tests {
forward_msgs(&t, &[instance.get_id()], chat_id).await?;
let instance2 = t.get_last_msg_in(chat_id).await;
assert!(instance2.is_forwarded());
assert_eq!(t.get_webxdc_status_updates(instance2.id, None).await?, "[]");
assert_eq!(
t.get_webxdc_status_updates(instance2.id, StatusUpdateSerial(0))
.await?,
"[]"
);
assert_eq!(chat_id.get_msg_cnt(&t).await?, 3); // two instances, only one info
let info = Message::load_from_db(&t, instance2.id)
.await?
@@ -712,7 +773,8 @@ mod tests {
.await
.is_err());
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, None).await?,
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
"[]"
);
@@ -723,8 +785,9 @@ mod tests {
.await
.is_ok());
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, None).await?,
r#"[{"payload":42}]"#
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":42,"serial":1,"max_serial":1}]"#
);
Ok(())
@@ -746,14 +809,16 @@ mod tests {
t.send_webxdc_status_update(instance.id, r#"{"payload": 42}"#, "descr")
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":42}]"#.to_string()
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":42,"serial":1,"max_serial":1}]"#.to_string()
);
// set_draft(None) deletes the message without the need to simulate network
chat_id.set_draft(&t, None).await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
"[]".to_string()
);
assert_eq!(
@@ -772,9 +837,13 @@ mod tests {
let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "foo").await?;
let mut instance = send_webxdc_instance(&t, chat_id).await?;
assert_eq!(t.get_webxdc_status_updates(instance.id, None).await?, "[]");
assert_eq!(
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
"[]"
);
let id = t
let update_id1 = t
.create_status_update_record(
&mut instance,
"\n\n{\"payload\": {\"foo\":\"bar\"}}\n",
@@ -782,8 +851,9 @@ mod tests {
)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, Some(id)).await?,
r#"[{"payload":{"foo":"bar"}}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
assert!(t
@@ -795,15 +865,12 @@ mod tests {
.await
.is_err());
assert_eq!(
t.get_webxdc_status_updates(instance.id, Some(id)).await?,
r#"[{"payload":{"foo":"bar"}}]"#
);
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
let id = t
let update_id2 = t
.create_status_update_record(
&mut instance,
r#"{"payload" : { "foo2":"bar2"}}"#,
@@ -811,19 +878,20 @@ mod tests {
)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, Some(id)).await?,
r#"[{"payload":{"foo2":"bar2"}}]"#
t.get_webxdc_status_updates(instance.id, update_id1).await?,
r#"[{"payload":{"foo2":"bar2"},"serial":2,"max_serial":2}]"#
);
t.create_status_update_record(&mut instance, r#"{"payload":true}"#, 1640178619)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}},
{"payload":{"foo2":"bar2"}},
{"payload":true}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":3},
{"payload":{"foo2":"bar2"},"serial":2,"max_serial":3},
{"payload":true,"serial":3,"max_serial":3}]"#
);
let id = t
let _update_id3 = t
.create_status_update_record(
&mut instance,
r#"{"payload" : 1, "sender": "that is not used"}"#,
@@ -831,17 +899,9 @@ mod tests {
)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, Some(id)).await?,
r#"[{"payload":1}]"#
);
// TODO: legacy `PAYLOAD` support should be deleted soon
let id = t
.create_status_update_record(&mut instance, r#"{"foo" : 1}"#, 1640178619)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, Some(id)).await?,
r#"[{"payload":{"foo":1}}]"#
t.get_webxdc_status_updates(instance.id, update_id2).await?,
r#"[{"payload":true,"serial":3,"max_serial":4},
{"payload":1,"serial":4,"max_serial":4}]"#
);
Ok(())
@@ -873,8 +933,9 @@ mod tests {
t.receive_status_update(instance.id, r#"{"updates":[{"payload":{"foo":"bar"}}]}"#)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
t.receive_status_update(
@@ -883,10 +944,11 @@ mod tests {
)
.await?;
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}},
{"payload":42},
{"payload":23}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":3},
{"payload":42,"serial":2,"max_serial":3},
{"payload":23,"serial":3,"max_serial":3}]"#
);
t.receive_status_update(
@@ -895,11 +957,12 @@ mod tests {
)
.await?; // ignore members that may be added in the future
assert_eq!(
t.get_webxdc_status_updates(instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}},
{"payload":42},
{"payload":23},
{"payload":"ok"}]"#
t.get_webxdc_status_updates(instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":4},
{"payload":42,"serial":2,"max_serial":4},
{"payload":23,"serial":3,"max_serial":4},
{"payload":"ok","serial":4,"max_serial":4}]"#
);
Ok(())
@@ -911,15 +974,7 @@ mod tests {
.get_matching(|evt| matches!(evt, EventType::WebxdcStatusUpdate { .. }))
.await;
match event {
EventType::WebxdcStatusUpdate {
msg_id,
status_update_id,
} => {
assert_eq!(
t.get_webxdc_status_updates(msg_id, Some(status_update_id))
.await?,
r#"[{"payload":{"foo":"bar"}}]"#
);
EventType::WebxdcStatusUpdate(msg_id) => {
assert_eq!(msg_id, instance_id);
}
_ => unreachable!(),
@@ -964,9 +1019,9 @@ mod tests {
assert!(sent2.payload().contains("descr text"));
assert_eq!(
alice
.get_webxdc_status_updates(alice_instance.id, None)
.get_webxdc_status_updates(alice_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"}}]"#
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
alice
@@ -979,10 +1034,10 @@ mod tests {
.unwrap();
assert_eq!(
alice
.get_webxdc_status_updates(alice_instance.id, None)
.get_webxdc_status_updates(alice_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"}},
{"payload":{"snipp":"snapp"}}]"#
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":2},
{"payload":{"snipp":"snapp"},"serial":2,"max_serial":2}]"#
);
// Bob receives all messages
@@ -998,8 +1053,9 @@ mod tests {
assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1);
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}}]"#
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
);
// Alice has a second device and also receives messages there
@@ -1091,9 +1147,10 @@ mod tests {
assert!(sent1.payload().contains("status-update.json"));
assert!(sent1.payload().contains(r#""payload":{"foo":"bar"}"#));
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, None).await?,
r#"[{"payload":{"foo":"bar"}},
{"payload":42}]"# // 'info: "i"' ignored as sent in draft mode
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":2},
{"payload":42,"serial":2,"max_serial":2}]"# // 'info: "i"' ignored as sent in draft mode
);
Ok(())
@@ -1411,9 +1468,9 @@ sth_for_the = "future""#
assert!(info_msg.quoted_message(&alice).await?.is_none());
assert_eq!(
alice
.get_webxdc_status_updates(alice_instance.id, None)
.get_webxdc_status_updates(alice_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":"sth. else","info":"this appears in-chat"}]"#
r#"[{"payload":"sth. else","info":"this appears in-chat","serial":1,"max_serial":1}]"#
);
// Bob receives all messages
@@ -1431,8 +1488,9 @@ sth_for_the = "future""#
assert_eq!(info_msg.parent(&bob).await?.unwrap().id, bob_instance.id);
assert!(info_msg.quoted_message(&bob).await?.is_none());
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, None).await?,
r#"[{"payload":"sth. else","info":"this appears in-chat"}]"#
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":"sth. else","info":"this appears in-chat","serial":1,"max_serial":1}]"#
);
// Alice has a second device and also receives the info message there
@@ -1455,9 +1513,9 @@ sth_for_the = "future""#
assert!(info_msg.quoted_message(&alice2).await?.is_none());
assert_eq!(
alice2
.get_webxdc_status_updates(alice2_instance.id, None)
.get_webxdc_status_updates(alice2_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":"sth. else","info":"this appears in-chat"}]"#
r#"[{"payload":"sth. else","info":"this appears in-chat","serial":1,"max_serial":1}]"#
);
Ok(())