From 84cabbcb7e7e7c7cb31f18bf317ad315ff788e17 Mon Sep 17 00:00:00 2001 From: bjoern Date: Sun, 26 Jun 2022 22:03:14 +0200 Subject: [PATCH] limit rate of webxdc updates (#3417) * more flexible render_webxdc_status_update_object() * delay webxdc updates when ratelimit is reached * inject updates messages to the SMTP loop as needed this avoids starting several tasks and allows sending updates out after a restart of the app. * use mutex to prevent race conditions in status updates * check ratelimiter only before the sending loop; it won't change until messages are actually sent out * fix typo * prefer standard type declaration over turbofish syntax * use UNIQUE and ON CONFLICT for query smtp_status_updates * combine DELETE+SELECT to one atomic statement * as all operations on smtp_status_updates are now atomic, a mutex is no longer needed * test DELETE+RETURNING statement * simplify calls to can_send() * comment about ratelimit boolean in send_smtp_messages() --- CHANGELOG.md | 1 + src/smtp.rs | 24 +++- src/sql/migrations.rs | 13 +++ src/webxdc.rs | 256 +++++++++++++++++++++++++++++++++--------- 4 files changed, 234 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53dd0d888..6d804d44e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - format message lines starting with `>` as quotes #3434 - node: remove `split2` dependency #3418 - node: add git installation info to readme #3418 +- limit the rate of webxdc update sending #3417 ### Fixes - set a default error if NDN does not provide an error diff --git a/src/smtp.rs b/src/smtp.rs index 9ab34e96c..8c2bd2adf 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -496,14 +496,22 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result { } } -/// Tries to send all messages currently in `smtp` and `smtp_mdns` tables. +/// Tries to send all messages currently in `smtp`, `smtp_status_updates` and `smtp_mdns` tables. /// /// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent /// does not block other messages in the queue from being sent. /// /// Returns true if sending was ratelimited, false otherwise. Errors are propagated to the caller. pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result { - context.send_sync_msg().await?; // Add sync message to the end of the queue if needed. + let mut ratelimited = if context.ratelimit.read().await.can_send() { + // add status updates and sync messages to end of sending queue + context.flush_status_updates().await?; + context.send_sync_msg().await?; + false + } else { + true + }; + let rowids = context .sql .query_map( @@ -526,9 +534,15 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) .context("failed to send message")?; } - let ratelimited = send_mdns(context, connection) - .await - .context("failed to send MDNs")?; + // although by slow sending, ratelimit may have been expired meanwhile, + // do not attempt to send MDNs if ratelimited happend before on status-updates/sync: + // instead, let the caller recall this function so that more important status-updates/sync are sent out. + if !ratelimited { + ratelimited = send_mdns(context, connection) + .await + .context("failed to send MDNs")?; + } + Ok(ratelimited) } diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 8cc29322d..e2e5dc354 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -637,6 +637,19 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid); ) .await?; } + if dbversion < 91 { + info!(context, "[migration] v91"); + sql.execute_migration( + r#"CREATE TABLE smtp_status_updates ( + msg_id INTEGER NOT NULL UNIQUE, -- msg_id of the webxdc instance with pending updates + first_serial INTEGER NOT NULL, -- id in msgs_status_updates + last_serial INTEGER NOT NULL, -- id in msgs_status_updates + descr TEXT NOT NULL -- text to send along with the updates + );"#, + 91, + ) + .await?; + } Ok(( recalc_fingerprints, diff --git a/src/webxdc.rs b/src/webxdc.rs index 6505eb86e..0267b7e23 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -8,6 +8,7 @@ use crate::message::{Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; use crate::param::Param; use crate::param::Params; +use crate::scheduler::InterruptInfo; use crate::{chat, EventType}; use anyhow::{bail, ensure, format_err, Result}; use async_std::path::PathBuf; @@ -332,15 +333,13 @@ impl Context { /// /// If the instance is a draft, /// the status update is sent once the instance is actually sent. - /// - /// If an update is sent immediately, the message-id of the update-message is returned, - /// this update-message is visible in chats, however, the id may be useful. + /// Otherwise, the update is sent as soon as possible. pub async fn send_webxdc_status_update( &self, instance_msg_id: MsgId, update_str: &str, descr: &str, - ) -> Result> { + ) -> Result<()> { let mut instance = Message::load_from_db(self, instance_msg_id).await?; if instance.viewtype != Viewtype::Webxdc { bail!("send_webxdc_status_update: is no webxdc message"); @@ -365,35 +364,73 @@ impl Context { .await?; if send_now { - // send update now - // (also send updates on MessagesState::Failed, maybe only one member cannot receive) - let mut status_update = Message { - chat_id: instance.chat_id, - viewtype: Viewtype::Text, - text: Some(descr.to_string()), - hidden: true, - ..Default::default() - }; - status_update - .param - .set_cmd(SystemMessage::WebxdcStatusUpdate); - status_update.param.set( - Param::Arg, - self.render_webxdc_status_update_object( - instance_msg_id, - Some(status_update_serial), - ) + self.sql.insert( + "INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?) + ON CONFLICT(msg_id) + DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr", + paramsv![instance.id, status_update_serial, status_update_serial, descr], + ).await?; + self.interrupt_smtp(InterruptInfo::new(false)).await; + } + Ok(()) + } + + /// Pops one record of queued webxdc status updates. + /// This function exists to make the sqlite statement testable. + async fn pop_smtp_status_update( + &self, + ) -> Result> { + let res = self + .sql + .query_row_optional( + "DELETE FROM smtp_status_updates + WHERE msg_id IN (SELECT msg_id FROM smtp_status_updates LIMIT 1) + RETURNING msg_id, first_serial, last_serial, descr", + paramsv![], + |row| { + let instance_id: MsgId = row.get(0)?; + let first_serial: StatusUpdateSerial = row.get(1)?; + let last_serial: StatusUpdateSerial = row.get(2)?; + let descr: String = row.get(3)?; + Ok((instance_id, first_serial, last_serial, descr)) + }, + ) + .await?; + Ok(res) + } + + /// Attempts to send queued webxdc status updates. + /// + /// Returns true if there are more status updates to send, but rate limiter does not + /// allow to send them. Returns false if there are no more status updates to send. + pub(crate) async fn flush_status_updates(&self) -> Result { + loop { + let (instance_id, first_serial, last_serial, descr) = + match self.pop_smtp_status_update().await? { + Some(res) => res, + None => return Ok(false), + }; + + if let Some(json) = self + .render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial))) .await? - .ok_or_else(|| format_err!("Status object expected."))?, - ); - status_update.set_quote(self, Some(&instance)).await?; - status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed - let status_update_msg_id = + { + let instance = Message::load_from_db(self, instance_id).await?; + let mut status_update = Message { + chat_id: instance.chat_id, + viewtype: Viewtype::Text, + text: Some(descr.to_string()), + hidden: true, + ..Default::default() + }; + status_update + .param + .set_cmd(SystemMessage::WebxdcStatusUpdate); + status_update.param.set(Param::Arg, json); + status_update.set_quote(self, Some(&instance)).await?; + status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed chat::send_msg(self, instance.chat_id, &mut status_update).await?; - Ok(Some(status_update_msg_id)) - } else { - // send update once the instance is actually send - Ok(None) + } } } @@ -510,20 +547,19 @@ impl Context { /// /// Example: `{"updates": [{"payload":"any update data"}, /// {"payload":"another update data"}]}` - /// If `status_update_serial` is set, exactly that update is rendered, otherwise all updates are rendered. pub(crate) async fn render_webxdc_status_update_object( &self, instance_msg_id: MsgId, - status_update_serial: Option, + range: Option<(StatusUpdateSerial, StatusUpdateSerial)>, ) -> Result> { let json = self .sql .query_map( - "SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND (1=? OR id=?) ORDER BY id", + "SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND id>=? AND id<=? ORDER BY id", paramsv![ instance_msg_id, - if status_update_serial.is_some() { 0 } else { 1 }, - status_update_serial.unwrap_or(StatusUpdateSerial(0)) + range.map(|r|r.0).unwrap_or(StatusUpdateSerial(0)), + range.map(|r|r.1).unwrap_or(StatusUpdateSerial(u32::MAX)), ], |row| row.get::<_, String>(0), |rows| { @@ -1215,17 +1251,17 @@ mod tests { assert_eq!(alice_instance.viewtype, Viewtype::Webxdc); assert!(!sent1.payload().contains("report-type=status-update")); - let status_update_msg_id = alice + alice .send_webxdc_status_update( alice_instance.id, r#"{"payload" : {"foo":"bar"}}"#, "descr text", ) - .await? - .unwrap(); + .await?; + alice.flush_status_updates().await?; expect_status_update_event(&alice, alice_instance.id).await?; let sent2 = &alice.pop_sent_msg().await; - let alice_update = Message::load_from_db(&alice, status_update_msg_id).await?; + let alice_update = Message::load_from_db(&alice, sent2.sender_msg_id).await?; assert!(alice_update.hidden); assert_eq!(alice_update.viewtype, Viewtype::Text); assert_eq!(alice_update.get_filename(), None); @@ -1251,8 +1287,7 @@ mod tests { r#"{"payload":{"snipp":"snapp"}}"#, "bla text", ) - .await? - .unwrap(); + .await?; assert_eq!( alice .get_webxdc_status_updates(alice_instance.id, StatusUpdateSerial(0)) @@ -1329,6 +1364,104 @@ mod tests { Ok(()) } + #[async_std::test] + async fn test_render_webxdc_status_update_object_range() -> Result<()> { + let t = TestContext::new_alice().await; + let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?; + let instance = send_webxdc_instance(&t, chat_id).await?; + t.send_webxdc_status_update(instance.id, r#"{"payload": 1}"#, "d") + .await?; + t.send_webxdc_status_update(instance.id, r#"{"payload": 2}"#, "d") + .await?; + t.send_webxdc_status_update(instance.id, r#"{"payload": 3}"#, "d") + .await?; + t.send_webxdc_status_update(instance.id, r#"{"payload": 4}"#, "d") + .await?; + let json = t + .render_webxdc_status_update_object( + instance.id, + Some((StatusUpdateSerial(2), StatusUpdateSerial(3))), + ) + .await? + .unwrap(); + assert_eq!(json, "{\"updates\":[{\"payload\":2},\n{\"payload\":3}]}"); + + assert_eq!( + t.sql + .count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],) + .await?, + 1 + ); + t.flush_status_updates().await?; + assert_eq!( + t.sql + .count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],) + .await?, + 0 + ); + Ok(()) + } + + #[async_std::test] + async fn test_pop_status_update() -> Result<()> { + let t = TestContext::new_alice().await; + let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?; + let instance1 = send_webxdc_instance(&t, chat_id).await?; + let instance2 = send_webxdc_instance(&t, chat_id).await?; + let instance3 = send_webxdc_instance(&t, chat_id).await?; + assert!(t.pop_smtp_status_update().await?.is_none()); + + t.send_webxdc_status_update(instance1.id, r#"{"payload": "1a"}"#, "descr1a") + .await?; + t.send_webxdc_status_update(instance2.id, r#"{"payload": "2a"}"#, "descr2a") + .await?; + t.send_webxdc_status_update(instance2.id, r#"{"payload": "2b"}"#, "descr2b") + .await?; + t.send_webxdc_status_update(instance3.id, r#"{"payload": "3a"}"#, "descr3a") + .await?; + t.send_webxdc_status_update(instance3.id, r#"{"payload": "3b"}"#, "descr3b") + .await?; + t.send_webxdc_status_update(instance3.id, r#"{"payload": "3c"}"#, "descr3c") + .await?; + assert_eq!( + t.sql + .count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],) + .await?, + 3 + ); + + // order of pop_status_update() is not defined, therefore the more complicated test + let mut instances_checked = 0; + for i in 0..3 { + let (instance, min_ser, max_ser, descr) = t.pop_smtp_status_update().await?.unwrap(); + if instance == instance1.id { + assert_eq!(min_ser, max_ser); + assert_eq!(descr, "descr1a"); + instances_checked += 1; + } else if instance == instance2.id { + assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 1); + assert_eq!(descr, "descr2b"); + instances_checked += 1; + } else if instance == instance3.id { + assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 2); + assert_eq!(descr, "descr3c"); + instances_checked += 1; + } else { + bail!("unexpected instance"); + } + assert_eq!( + t.sql + .count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],) + .await?, + 2 - i + ); + } + assert_eq!(instances_checked, 3); + assert!(t.pop_smtp_status_update().await?.is_none()); + + Ok(()) + } + #[async_std::test] async fn test_draft_and_send_webxdc_status_update() -> Result<()> { let alice = TestContext::new_alice().await; @@ -1348,15 +1481,22 @@ mod tests { .await?; let mut alice_instance = alice_chat_id.get_draft(&alice).await?.unwrap(); - let status_update_msg_id = alice + alice .send_webxdc_status_update(alice_instance.id, r#"{"payload": {"foo":"bar"}}"#, "descr") .await?; - assert_eq!(status_update_msg_id, None); + alice.flush_status_updates().await?; expect_status_update_event(&alice, alice_instance.id).await?; - let status_update_msg_id = alice + alice .send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#, "descr") .await?; - assert_eq!(status_update_msg_id, None); + alice.flush_status_updates().await?; + assert_eq!( + alice + .sql + .count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],) + .await?, + 0 + ); assert!(!alice.get_last_msg().await.is_info()); // 'info: "i"' message not added in draft mode // send webxdc instance, @@ -1665,6 +1805,7 @@ sth_for_the = "future""# "descr", ) .await?; + alice.flush_status_updates().await?; let sent_update1 = &alice.pop_sent_msg().await; let info = Message::load_from_db(&alice, alice_instance.id) .await? @@ -1679,6 +1820,7 @@ sth_for_the = "future""# "descr", ) .await?; + alice.flush_status_updates().await?; let sent_update2 = &alice.pop_sent_msg().await; let info = Message::load_from_db(&alice, alice_instance.id) .await? @@ -1730,6 +1872,7 @@ sth_for_the = "future""# "descr", ) .await?; + alice.flush_status_updates().await?; let sent_update1 = &alice.pop_sent_msg().await; let info = Message::load_from_db(&alice, alice_instance.id) .await? @@ -1769,6 +1912,7 @@ sth_for_the = "future""# "descr text", ) .await?; + alice.flush_status_updates().await?; let sent2 = &alice.pop_sent_msg().await; assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 2); let info_msg = alice.get_last_msg().await; @@ -1854,11 +1998,13 @@ sth_for_the = "future""# alice .send_webxdc_status_update(alice_instance.id, r#"{"info":"i1", "payload":1}"#, "d") .await?; + alice.flush_status_updates().await?; let sent2 = &alice.pop_sent_msg().await; assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 2); alice .send_webxdc_status_update(alice_instance.id, r#"{"info":"i2", "payload":2}"#, "d") .await?; + alice.flush_status_updates().await?; let sent3 = &alice.pop_sent_msg().await; assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 2); let info_msg = alice.get_last_msg().await; @@ -1916,12 +2062,12 @@ sth_for_the = "future""# alice_chat_id.accept(&alice).await?; let alice_instance = send_webxdc_instance(&alice, alice_chat_id).await?; let sent1 = &alice.pop_sent_msg().await; - let update_msg_id = alice + alice .send_webxdc_status_update(alice_instance.id, r#"{"payload":42}"#, "descr") - .await? - .unwrap(); - let update_msg = Message::load_from_db(&alice, update_msg_id).await?; + .await?; + alice.flush_status_updates().await?; let sent2 = &alice.pop_sent_msg().await; + let update_msg = Message::load_from_db(&alice, sent2.sender_msg_id).await?; assert!(alice_instance.get_showpadlock()); assert!(update_msg.get_showpadlock()); @@ -1937,11 +2083,11 @@ sth_for_the = "future""# Contact::create(&bob, "", "claire@example.org").await?, ) .await?; - let update_msg_id = bob - .send_webxdc_status_update(bob_instance.id, r#"{"payload":43}"#, "descr") - .await? - .unwrap(); - let update_msg = Message::load_from_db(&bob, update_msg_id).await?; + bob.send_webxdc_status_update(bob_instance.id, r#"{"payload":43}"#, "descr") + .await?; + bob.flush_status_updates().await?; + let sent3 = bob.pop_sent_msg().await; + let update_msg = Message::load_from_db(&bob, sent3.sender_msg_id).await?; assert!(!update_msg.get_showpadlock()); Ok(())