feat: Limit the size of aggregated WebXDC update to 100 KiB (#4825)

Before, update sending might be delayed due to rate limits and later merged into large
messages. This is undesirable for apps that want to send large files over WebXDC updates because the
message with aggregated update may be too large for actual sending and hit the provider limit or
require multiple attempts on a flaky SMTP connection.

So, don't aggregate updates if the size of an aggregated update will exceed the limit of 100
KiB. This is a soft limit, so it may be exceeded if a single update is larger and it limits only the
update JSON size, so the message with all envelopes still may be larger. Also the limit may be
exceeded when updates are sent together with the WebXDC instance when resending it as the instance
size isn't accounted to not complicate the code. At least this is not worse than the previous
behaviour when all updates were attached.
This commit is contained in:
iequidoo
2024-04-23 05:24:14 -03:00
committed by iequidoo
parent 0f26da4028
commit 9996c2db80
3 changed files with 245 additions and 67 deletions

View File

@@ -18,6 +18,7 @@
mod integration;
mod maps_integration;
use std::cmp::max;
use std::path::Path;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
@@ -122,6 +123,11 @@ impl StatusUpdateSerial {
StatusUpdateSerial(id)
}
/// Minimum value.
pub const MIN: Self = Self(1);
/// Maximum value.
pub const MAX: Self = Self(u32::MAX - 1);
/// Gets StatusUpdateSerial as untyped integer.
/// Avoid using this outside ffi.
pub fn to_u32(self) -> u32 {
@@ -196,6 +202,9 @@ fn find_zip_entry<'a>(
None
}
/// Status update JSON size soft limit.
const STATUS_UPDATE_SIZE_MAX: usize = 100 << 10;
impl Context {
/// check if a file is an acceptable webxdc for sending or receiving.
pub(crate) async fn is_webxdc_file(&self, filename: &str, file: &[u8]) -> Result<bool> {
@@ -505,22 +514,19 @@ impl Context {
Ok(())
}
/// Pops one record of queued webxdc status updates.
/// This function exists to make the sqlite statement testable.
async fn pop_smtp_status_update(
/// Returns one record of the queued webxdc status updates.
async fn smtp_status_update_get(
&self,
) -> Result<Option<(MsgId, StatusUpdateSerial, StatusUpdateSerial, String)>> {
let _lock = self.sql.write_lock().await;
) -> Result<Option<(MsgId, i64, StatusUpdateSerial, String)>> {
let res = self
.sql
.query_row_optional(
"DELETE FROM smtp_status_updates
WHERE msg_id IN (SELECT msg_id FROM smtp_status_updates LIMIT 1)
RETURNING msg_id, first_serial, last_serial, descr",
"SELECT msg_id, first_serial, last_serial, descr \
FROM smtp_status_updates LIMIT 1",
(),
|row| {
let instance_id: MsgId = row.get(0)?;
let first_serial: StatusUpdateSerial = row.get(1)?;
let first_serial: i64 = row.get(1)?;
let last_serial: StatusUpdateSerial = row.get(2)?;
let descr: String = row.get(3)?;
Ok((instance_id, first_serial, last_serial, descr))
@@ -530,19 +536,50 @@ impl Context {
Ok(res)
}
async fn smtp_status_update_pop_serials(
&self,
msg_id: MsgId,
first: i64,
first_new: StatusUpdateSerial,
) -> Result<()> {
if self
.sql
.execute(
"DELETE FROM smtp_status_updates \
WHERE msg_id=? AND first_serial=? AND last_serial<?",
(msg_id, first, first_new),
)
.await?
> 0
{
return Ok(());
}
self.sql
.execute(
"UPDATE smtp_status_updates SET first_serial=? \
WHERE msg_id=? AND first_serial=?",
(first_new, msg_id, first),
)
.await?;
Ok(())
}
/// Attempts to send queued webxdc status updates.
pub(crate) async fn flush_status_updates(&self) -> Result<()> {
loop {
let (instance_id, first_serial, last_serial, descr) =
match self.pop_smtp_status_update().await? {
Some(res) => res,
None => return Ok(()),
};
if let Some(json) = self
.render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial)))
.await?
{
let (instance_id, first, last, descr) = match self.smtp_status_update_get().await? {
Some(res) => res,
None => return Ok(()),
};
let (json, first_new) = self
.render_webxdc_status_update_object(
instance_id,
StatusUpdateSerial(max(first, 1).try_into()?),
last,
Some(STATUS_UPDATE_SIZE_MAX),
)
.await?;
if let Some(json) = json {
let instance = Message::load_from_db(self, instance_id).await?;
let mut status_update = Message {
chat_id: instance.chat_id,
@@ -559,6 +596,8 @@ impl Context {
status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed
chat::send_msg(self, instance.chat_id, &mut status_update).await?;
}
self.smtp_status_update_pop_serials(instance_id, first, first_new)
.await?;
}
}
@@ -690,45 +729,59 @@ impl Context {
/// Renders JSON-object for status updates as used on the wire.
///
/// Example: `{"updates": [{"payload":"any update data"},
/// {"payload":"another update data"}]}`
/// Returns optional JSON and the first serial of updates not included due to a JSON size
/// limit. If all requested updates are included, returns the first not requested serial.
///
/// `range` is an optional range of status update serials to send.
/// If it is `None`, all updates are sent.
/// This is used when a message is resent using [`crate::chat::resend_msgs`].
/// Example JSON: `{"updates": [{"payload":"any update data"},
/// {"payload":"another update data"}]}`
///
/// * `(first, last)`: range of status update serials to send.
pub(crate) async fn render_webxdc_status_update_object(
&self,
instance_msg_id: MsgId,
range: Option<(StatusUpdateSerial, StatusUpdateSerial)>,
) -> Result<Option<String>> {
let json = self
first: StatusUpdateSerial,
last: StatusUpdateSerial,
size_max: Option<usize>,
) -> Result<(Option<String>, StatusUpdateSerial)> {
let (json, first_new) = self
.sql
.query_map(
"SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND id>=? AND id<=? ORDER BY id",
(
instance_msg_id,
range.map(|r|r.0).unwrap_or(StatusUpdateSerial(0)),
range.map(|r|r.1).unwrap_or(StatusUpdateSerial(u32::MAX)),
),
|row| row.get::<_, String>(0),
"SELECT id, update_item FROM msgs_status_updates \
WHERE msg_id=? AND id>=? AND id<=? ORDER BY id",
(instance_msg_id, first, last),
|row| {
let id: StatusUpdateSerial = row.get(0)?;
let update_item: String = row.get(1)?;
Ok((id, update_item))
},
|rows| {
let mut json = String::default();
for row in rows {
let update_item = row?;
let (id, update_item) = row?;
if !json.is_empty()
&& json.len() + update_item.len() >= size_max.unwrap_or(usize::MAX)
{
return Ok((json, id));
}
if !json.is_empty() {
json.push_str(",\n");
}
json.push_str(&update_item);
}
Ok(json)
Ok((
json,
// Too late to fail here if an overflow happens. It's still better to send
// the updates.
StatusUpdateSerial::new(last.to_u32().saturating_add(1)),
))
},
)
.await?;
if json.is_empty() {
Ok(None)
} else {
Ok(Some(format!(r#"{{"updates":[{json}]}}"#)))
}
let json = match json.is_empty() {
true => None,
false => Some(format!(r#"{{"updates":[{json}]}}"#)),
};
Ok((json, first_new))
}
}
@@ -1089,10 +1142,13 @@ mod tests {
assert_eq!(alice_grp.get_msg_cnt(&alice).await?, 3);
resend_msgs(&alice, &[alice_instance.id]).await?;
let sent1 = alice.pop_sent_msg().await;
alice.flush_status_updates().await?;
let sent2 = alice.pop_sent_msg().await;
// Bob received webxdc, legacy info-messages updates are received but not added to the chat
// Bob receives webxdc, legacy info-messages updates are received and added to the chat.
let bob = tcm.bob().await;
let bob_instance = bob.recv_msg(&sent1).await;
bob.recv_msg_trash(&sent2).await;
assert_eq!(bob_instance.viewtype, Viewtype::Webxdc);
assert!(!bob_instance.is_info());
assert_eq!(
@@ -1684,6 +1740,79 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_big_webxdc_status_update() -> Result<()> {
let alice = TestContext::new_alice().await;
alice.set_config_bool(Config::BccSelf, true).await?;
let bob = TestContext::new_bob().await;
let alice_chat = alice.create_chat(&bob).await;
let alice_instance = send_webxdc_instance(&alice, alice_chat.id).await?;
let sent1 = &alice.pop_sent_msg().await;
assert_eq!(alice_instance.viewtype, Viewtype::Webxdc);
assert!(!sent1.payload().contains("report-type=status-update"));
let update1_str = r#"{"payload":{"foo":""#.to_string()
+ &String::from_utf8(vec![b'a'; STATUS_UPDATE_SIZE_MAX])?
+ r#""}"#;
alice
.send_webxdc_status_update(alice_instance.id, &(update1_str.clone() + "}"), "descr1")
.await?;
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar2"}}"#,
"descr2",
)
.await?;
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar3"}}"#,
"descr3",
)
.await?;
alice.flush_status_updates().await?;
// There's the message stack, so we pop messages in the reverse order.
let sent3 = &alice.pop_sent_msg().await;
let alice_update = sent3.load_from_db().await;
assert_eq!(alice_update.text, "descr3".to_string());
let sent2 = &alice.pop_sent_msg().await;
let alice_update = sent2.load_from_db().await;
assert_eq!(alice_update.text, "descr3".to_string());
assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 1);
// Bob receives the instance.
let bob_instance = bob.recv_msg(sent1).await;
let bob_chat_id = bob_instance.chat_id;
assert_eq!(bob_instance.rfc724_mid, alice_instance.rfc724_mid);
assert_eq!(bob_instance.viewtype, Viewtype::Webxdc);
assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1);
// Bob receives the status updates.
bob.recv_msg_trash(sent2).await;
expect_status_update_event(&bob, bob_instance.id).await?;
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
.await?,
"[".to_string() + &update1_str + r#","serial":1,"max_serial":1}]"#
);
bob.recv_msg_trash(sent3).await;
for _ in 0..2 {
expect_status_update_event(&bob, bob_instance.id).await?;
}
assert_eq!(
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(1))
.await?,
r#"[{"payload":{"foo":"bar2"},"serial":2,"max_serial":3},
{"payload":{"foo":"bar3"},"serial":3,"max_serial":3}]"#
);
assert_eq!(bob_chat_id.get_msg_cnt(&bob).await?, 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_render_webxdc_status_update_object() -> Result<()> {
let t = TestContext::new_alice().await;
@@ -1695,17 +1824,20 @@ mod tests {
)
.await?;
chat_id.set_draft(&t, Some(&mut instance)).await?;
assert!(t
.render_webxdc_status_update_object(instance.id, None)
.await?
.is_none());
let (first, last) = (StatusUpdateSerial(1), StatusUpdateSerial::MAX);
assert_eq!(
t.render_webxdc_status_update_object(instance.id, first, last, None)
.await?,
(None, StatusUpdateSerial(u32::MAX))
);
t.send_webxdc_status_update(instance.id, r#"{"payload": 1}"#, "bla")
.await?;
assert!(t
.render_webxdc_status_update_object(instance.id, None)
.await?
.is_some());
let (object, first_new) = t
.render_webxdc_status_update_object(instance.id, first, last, None)
.await?;
assert!(object.is_some());
assert_eq!(first_new, StatusUpdateSerial(u32::MAX));
Ok(())
}
@@ -1723,13 +1855,16 @@ mod tests {
.await?;
t.send_webxdc_status_update(instance.id, r#"{"payload": 4}"#, "d")
.await?;
let json = t
let (json, first_new) = t
.render_webxdc_status_update_object(
instance.id,
Some((StatusUpdateSerial(2), StatusUpdateSerial(3))),
StatusUpdateSerial(2),
StatusUpdateSerial(3),
None,
)
.await?
.unwrap();
.await?;
let json = json.unwrap();
assert_eq!(first_new, StatusUpdateSerial(4));
let json = Regex::new(r#""uid":"[^"]*""#)
.unwrap()
.replace_all(&json, "XXX");
@@ -1761,7 +1896,7 @@ mod tests {
let instance1 = send_webxdc_instance(&t, chat_id).await?;
let instance2 = send_webxdc_instance(&t, chat_id).await?;
let instance3 = send_webxdc_instance(&t, chat_id).await?;
assert!(t.pop_smtp_status_update().await?.is_none());
assert!(t.smtp_status_update_get().await?.is_none());
t.send_webxdc_status_update(instance1.id, r#"{"payload": "1a"}"#, "descr1a")
.await?;
@@ -1782,20 +1917,27 @@ mod tests {
3
);
// order of pop_status_update() is not defined, therefore the more complicated test
// order of smtp_status_update_get() is not defined, therefore the more complicated test
let mut instances_checked = 0;
for i in 0..3 {
let (instance, min_ser, max_ser, descr) = t.pop_smtp_status_update().await?.unwrap();
let (instance, min_ser, max_ser, descr) = t.smtp_status_update_get().await?.unwrap();
t.smtp_status_update_pop_serials(
instance,
min_ser,
StatusUpdateSerial::new(max_ser.to_u32().checked_add(1).unwrap()),
)
.await?;
let min_ser: u32 = min_ser.try_into()?;
if instance == instance1.id {
assert_eq!(min_ser, max_ser);
assert_eq!(min_ser, max_ser.to_u32());
assert_eq!(descr, "descr1a");
instances_checked += 1;
} else if instance == instance2.id {
assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 1);
assert_eq!(min_ser, max_ser.to_u32() - 1);
assert_eq!(descr, "descr2b");
instances_checked += 1;
} else if instance == instance3.id {
assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 2);
assert_eq!(min_ser, max_ser.to_u32() - 2);
assert_eq!(descr, "descr3c");
instances_checked += 1;
} else {
@@ -1809,7 +1951,7 @@ mod tests {
);
}
assert_eq!(instances_checked, 3);
assert!(t.pop_smtp_status_update().await?.is_none());
assert!(t.smtp_status_update_get().await?.is_none());
Ok(())
}
@@ -1836,12 +1978,11 @@ mod tests {
alice
.send_webxdc_status_update(alice_instance.id, r#"{"payload": {"foo":"bar"}}"#, "descr")
.await?;
alice.flush_status_updates().await?;
expect_status_update_event(&alice, alice_instance.id).await?;
alice
.send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#, "descr")
.await?;
alice.flush_status_updates().await?;
expect_status_update_event(&alice, alice_instance.id).await?;
assert_eq!(
alice
.sql