limit rate of webxdc updates (#3417)

* more flexible render_webxdc_status_update_object()

* delay webxdc updates when ratelimit is reached

* inject updates messages to the SMTP loop as needed

this avoids starting several tasks
and allows sending updates out after a restart of the app.

* use mutex to prevent race conditions in status updates

* check ratelimiter only before the sending loop; it won't change until messages are actually sent out

* fix typo

* prefer standard type declaration over turbofish syntax

* use UNIQUE and ON CONFLICT for query smtp_status_updates

* combine DELETE+SELECT to one atomic statement

* as all operations on smtp_status_updates are now atomic, a mutex is no longer needed

* test DELETE+RETURNING statement

* simplify calls to can_send()

* comment about ratelimit boolean in send_smtp_messages()
This commit is contained in:
bjoern
2022-06-26 22:03:14 +02:00
committed by GitHub
parent f23fa1c9d3
commit 84cabbcb7e
4 changed files with 234 additions and 60 deletions

View File

@@ -9,6 +9,7 @@
- format message lines starting with `>` as quotes #3434
- node: remove `split2` dependency #3418
- node: add git installation info to readme #3418
- limit the rate of webxdc update sending #3417
### Fixes
- set a default error if NDN does not provide an error

View File

@@ -496,14 +496,22 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<bool> {
}
}
/// Tries to send all messages currently in `smtp` and `smtp_mdns` tables.
/// Tries to send all messages currently in `smtp`, `smtp_status_updates` and `smtp_mdns` tables.
///
/// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent
/// does not block other messages in the queue from being sent.
///
/// Returns true if sending was ratelimited, false otherwise. Errors are propagated to the caller.
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<bool> {
context.send_sync_msg().await?; // Add sync message to the end of the queue if needed.
let mut ratelimited = if context.ratelimit.read().await.can_send() {
// add status updates and sync messages to end of sending queue
context.flush_status_updates().await?;
context.send_sync_msg().await?;
false
} else {
true
};
let rowids = context
.sql
.query_map(
@@ -526,9 +534,15 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp)
.context("failed to send message")?;
}
let ratelimited = send_mdns(context, connection)
.await
.context("failed to send MDNs")?;
// although by slow sending, ratelimit may have been expired meanwhile,
// do not attempt to send MDNs if ratelimited happend before on status-updates/sync:
// instead, let the caller recall this function so that more important status-updates/sync are sent out.
if !ratelimited {
ratelimited = send_mdns(context, connection)
.await
.context("failed to send MDNs")?;
}
Ok(ratelimited)
}

View File

@@ -637,6 +637,19 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
)
.await?;
}
if dbversion < 91 {
info!(context, "[migration] v91");
sql.execute_migration(
r#"CREATE TABLE smtp_status_updates (
msg_id INTEGER NOT NULL UNIQUE, -- msg_id of the webxdc instance with pending updates
first_serial INTEGER NOT NULL, -- id in msgs_status_updates
last_serial INTEGER NOT NULL, -- id in msgs_status_updates
descr TEXT NOT NULL -- text to send along with the updates
);"#,
91,
)
.await?;
}
Ok((
recalc_fingerprints,

View File

@@ -8,6 +8,7 @@ use crate::message::{Message, MessageState, MsgId, Viewtype};
use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::scheduler::InterruptInfo;
use crate::{chat, EventType};
use anyhow::{bail, ensure, format_err, Result};
use async_std::path::PathBuf;
@@ -332,15 +333,13 @@ impl Context {
///
/// If the instance is a draft,
/// the status update is sent once the instance is actually sent.
///
/// If an update is sent immediately, the message-id of the update-message is returned,
/// this update-message is visible in chats, however, the id may be useful.
/// Otherwise, the update is sent as soon as possible.
pub async fn send_webxdc_status_update(
&self,
instance_msg_id: MsgId,
update_str: &str,
descr: &str,
) -> Result<Option<MsgId>> {
) -> Result<()> {
let mut instance = Message::load_from_db(self, instance_msg_id).await?;
if instance.viewtype != Viewtype::Webxdc {
bail!("send_webxdc_status_update: is no webxdc message");
@@ -365,35 +364,73 @@ impl Context {
.await?;
if send_now {
// send update now
// (also send updates on MessagesState::Failed, maybe only one member cannot receive)
let mut status_update = Message {
chat_id: instance.chat_id,
viewtype: Viewtype::Text,
text: Some(descr.to_string()),
hidden: true,
..Default::default()
};
status_update
.param
.set_cmd(SystemMessage::WebxdcStatusUpdate);
status_update.param.set(
Param::Arg,
self.render_webxdc_status_update_object(
instance_msg_id,
Some(status_update_serial),
)
self.sql.insert(
"INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?)
ON CONFLICT(msg_id)
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
paramsv![instance.id, status_update_serial, status_update_serial, descr],
).await?;
self.interrupt_smtp(InterruptInfo::new(false)).await;
}
Ok(())
}
/// Pops one record of queued webxdc status updates.
/// This function exists to make the sqlite statement testable.
async fn pop_smtp_status_update(
&self,
) -> Result<Option<(MsgId, StatusUpdateSerial, StatusUpdateSerial, String)>> {
let res = self
.sql
.query_row_optional(
"DELETE FROM smtp_status_updates
WHERE msg_id IN (SELECT msg_id FROM smtp_status_updates LIMIT 1)
RETURNING msg_id, first_serial, last_serial, descr",
paramsv![],
|row| {
let instance_id: MsgId = row.get(0)?;
let first_serial: StatusUpdateSerial = row.get(1)?;
let last_serial: StatusUpdateSerial = row.get(2)?;
let descr: String = row.get(3)?;
Ok((instance_id, first_serial, last_serial, descr))
},
)
.await?;
Ok(res)
}
/// Attempts to send queued webxdc status updates.
///
/// Returns true if there are more status updates to send, but rate limiter does not
/// allow to send them. Returns false if there are no more status updates to send.
pub(crate) async fn flush_status_updates(&self) -> Result<bool> {
loop {
let (instance_id, first_serial, last_serial, descr) =
match self.pop_smtp_status_update().await? {
Some(res) => res,
None => return Ok(false),
};
if let Some(json) = self
.render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial)))
.await?
.ok_or_else(|| format_err!("Status object expected."))?,
);
status_update.set_quote(self, Some(&instance)).await?;
status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed
let status_update_msg_id =
{
let instance = Message::load_from_db(self, instance_id).await?;
let mut status_update = Message {
chat_id: instance.chat_id,
viewtype: Viewtype::Text,
text: Some(descr.to_string()),
hidden: true,
..Default::default()
};
status_update
.param
.set_cmd(SystemMessage::WebxdcStatusUpdate);
status_update.param.set(Param::Arg, json);
status_update.set_quote(self, Some(&instance)).await?;
status_update.param.remove(Param::GuaranteeE2ee); // may be set by set_quote(), if #2985 is done, this line can be removed
chat::send_msg(self, instance.chat_id, &mut status_update).await?;
Ok(Some(status_update_msg_id))
} else {
// send update once the instance is actually send
Ok(None)
}
}
}
@@ -510,20 +547,19 @@ impl Context {
///
/// Example: `{"updates": [{"payload":"any update data"},
/// {"payload":"another update data"}]}`
/// If `status_update_serial` is set, exactly that update is rendered, otherwise all updates are rendered.
pub(crate) async fn render_webxdc_status_update_object(
&self,
instance_msg_id: MsgId,
status_update_serial: Option<StatusUpdateSerial>,
range: Option<(StatusUpdateSerial, StatusUpdateSerial)>,
) -> Result<Option<String>> {
let json = self
.sql
.query_map(
"SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND (1=? OR id=?) ORDER BY id",
"SELECT update_item FROM msgs_status_updates WHERE msg_id=? AND id>=? AND id<=? ORDER BY id",
paramsv![
instance_msg_id,
if status_update_serial.is_some() { 0 } else { 1 },
status_update_serial.unwrap_or(StatusUpdateSerial(0))
range.map(|r|r.0).unwrap_or(StatusUpdateSerial(0)),
range.map(|r|r.1).unwrap_or(StatusUpdateSerial(u32::MAX)),
],
|row| row.get::<_, String>(0),
|rows| {
@@ -1215,17 +1251,17 @@ mod tests {
assert_eq!(alice_instance.viewtype, Viewtype::Webxdc);
assert!(!sent1.payload().contains("report-type=status-update"));
let status_update_msg_id = alice
alice
.send_webxdc_status_update(
alice_instance.id,
r#"{"payload" : {"foo":"bar"}}"#,
"descr text",
)
.await?
.unwrap();
.await?;
alice.flush_status_updates().await?;
expect_status_update_event(&alice, alice_instance.id).await?;
let sent2 = &alice.pop_sent_msg().await;
let alice_update = Message::load_from_db(&alice, status_update_msg_id).await?;
let alice_update = Message::load_from_db(&alice, sent2.sender_msg_id).await?;
assert!(alice_update.hidden);
assert_eq!(alice_update.viewtype, Viewtype::Text);
assert_eq!(alice_update.get_filename(), None);
@@ -1251,8 +1287,7 @@ mod tests {
r#"{"payload":{"snipp":"snapp"}}"#,
"bla text",
)
.await?
.unwrap();
.await?;
assert_eq!(
alice
.get_webxdc_status_updates(alice_instance.id, StatusUpdateSerial(0))
@@ -1329,6 +1364,104 @@ mod tests {
Ok(())
}
#[async_std::test]
async fn test_render_webxdc_status_update_object_range() -> Result<()> {
let t = TestContext::new_alice().await;
let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?;
let instance = send_webxdc_instance(&t, chat_id).await?;
t.send_webxdc_status_update(instance.id, r#"{"payload": 1}"#, "d")
.await?;
t.send_webxdc_status_update(instance.id, r#"{"payload": 2}"#, "d")
.await?;
t.send_webxdc_status_update(instance.id, r#"{"payload": 3}"#, "d")
.await?;
t.send_webxdc_status_update(instance.id, r#"{"payload": 4}"#, "d")
.await?;
let json = t
.render_webxdc_status_update_object(
instance.id,
Some((StatusUpdateSerial(2), StatusUpdateSerial(3))),
)
.await?
.unwrap();
assert_eq!(json, "{\"updates\":[{\"payload\":2},\n{\"payload\":3}]}");
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],)
.await?,
1
);
t.flush_status_updates().await?;
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],)
.await?,
0
);
Ok(())
}
#[async_std::test]
async fn test_pop_status_update() -> Result<()> {
let t = TestContext::new_alice().await;
let chat_id = create_group_chat(&t, ProtectionStatus::Unprotected, "a chat").await?;
let instance1 = send_webxdc_instance(&t, chat_id).await?;
let instance2 = send_webxdc_instance(&t, chat_id).await?;
let instance3 = send_webxdc_instance(&t, chat_id).await?;
assert!(t.pop_smtp_status_update().await?.is_none());
t.send_webxdc_status_update(instance1.id, r#"{"payload": "1a"}"#, "descr1a")
.await?;
t.send_webxdc_status_update(instance2.id, r#"{"payload": "2a"}"#, "descr2a")
.await?;
t.send_webxdc_status_update(instance2.id, r#"{"payload": "2b"}"#, "descr2b")
.await?;
t.send_webxdc_status_update(instance3.id, r#"{"payload": "3a"}"#, "descr3a")
.await?;
t.send_webxdc_status_update(instance3.id, r#"{"payload": "3b"}"#, "descr3b")
.await?;
t.send_webxdc_status_update(instance3.id, r#"{"payload": "3c"}"#, "descr3c")
.await?;
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],)
.await?,
3
);
// order of pop_status_update() is not defined, therefore the more complicated test
let mut instances_checked = 0;
for i in 0..3 {
let (instance, min_ser, max_ser, descr) = t.pop_smtp_status_update().await?.unwrap();
if instance == instance1.id {
assert_eq!(min_ser, max_ser);
assert_eq!(descr, "descr1a");
instances_checked += 1;
} else if instance == instance2.id {
assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 1);
assert_eq!(descr, "descr2b");
instances_checked += 1;
} else if instance == instance3.id {
assert_eq!(min_ser.to_u32(), max_ser.to_u32() - 2);
assert_eq!(descr, "descr3c");
instances_checked += 1;
} else {
bail!("unexpected instance");
}
assert_eq!(
t.sql
.count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],)
.await?,
2 - i
);
}
assert_eq!(instances_checked, 3);
assert!(t.pop_smtp_status_update().await?.is_none());
Ok(())
}
#[async_std::test]
async fn test_draft_and_send_webxdc_status_update() -> Result<()> {
let alice = TestContext::new_alice().await;
@@ -1348,15 +1481,22 @@ mod tests {
.await?;
let mut alice_instance = alice_chat_id.get_draft(&alice).await?.unwrap();
let status_update_msg_id = alice
alice
.send_webxdc_status_update(alice_instance.id, r#"{"payload": {"foo":"bar"}}"#, "descr")
.await?;
assert_eq!(status_update_msg_id, None);
alice.flush_status_updates().await?;
expect_status_update_event(&alice, alice_instance.id).await?;
let status_update_msg_id = alice
alice
.send_webxdc_status_update(alice_instance.id, r#"{"payload":42, "info":"i"}"#, "descr")
.await?;
assert_eq!(status_update_msg_id, None);
alice.flush_status_updates().await?;
assert_eq!(
alice
.sql
.count("SELECT COUNT(*) FROM smtp_status_updates", paramsv![],)
.await?,
0
);
assert!(!alice.get_last_msg().await.is_info()); // 'info: "i"' message not added in draft mode
// send webxdc instance,
@@ -1665,6 +1805,7 @@ sth_for_the = "future""#
"descr",
)
.await?;
alice.flush_status_updates().await?;
let sent_update1 = &alice.pop_sent_msg().await;
let info = Message::load_from_db(&alice, alice_instance.id)
.await?
@@ -1679,6 +1820,7 @@ sth_for_the = "future""#
"descr",
)
.await?;
alice.flush_status_updates().await?;
let sent_update2 = &alice.pop_sent_msg().await;
let info = Message::load_from_db(&alice, alice_instance.id)
.await?
@@ -1730,6 +1872,7 @@ sth_for_the = "future""#
"descr",
)
.await?;
alice.flush_status_updates().await?;
let sent_update1 = &alice.pop_sent_msg().await;
let info = Message::load_from_db(&alice, alice_instance.id)
.await?
@@ -1769,6 +1912,7 @@ sth_for_the = "future""#
"descr text",
)
.await?;
alice.flush_status_updates().await?;
let sent2 = &alice.pop_sent_msg().await;
assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 2);
let info_msg = alice.get_last_msg().await;
@@ -1854,11 +1998,13 @@ sth_for_the = "future""#
alice
.send_webxdc_status_update(alice_instance.id, r#"{"info":"i1", "payload":1}"#, "d")
.await?;
alice.flush_status_updates().await?;
let sent2 = &alice.pop_sent_msg().await;
assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 2);
alice
.send_webxdc_status_update(alice_instance.id, r#"{"info":"i2", "payload":2}"#, "d")
.await?;
alice.flush_status_updates().await?;
let sent3 = &alice.pop_sent_msg().await;
assert_eq!(alice_chat.id.get_msg_cnt(&alice).await?, 2);
let info_msg = alice.get_last_msg().await;
@@ -1916,12 +2062,12 @@ sth_for_the = "future""#
alice_chat_id.accept(&alice).await?;
let alice_instance = send_webxdc_instance(&alice, alice_chat_id).await?;
let sent1 = &alice.pop_sent_msg().await;
let update_msg_id = alice
alice
.send_webxdc_status_update(alice_instance.id, r#"{"payload":42}"#, "descr")
.await?
.unwrap();
let update_msg = Message::load_from_db(&alice, update_msg_id).await?;
.await?;
alice.flush_status_updates().await?;
let sent2 = &alice.pop_sent_msg().await;
let update_msg = Message::load_from_db(&alice, sent2.sender_msg_id).await?;
assert!(alice_instance.get_showpadlock());
assert!(update_msg.get_showpadlock());
@@ -1937,11 +2083,11 @@ sth_for_the = "future""#
Contact::create(&bob, "", "claire@example.org").await?,
)
.await?;
let update_msg_id = bob
.send_webxdc_status_update(bob_instance.id, r#"{"payload":43}"#, "descr")
.await?
.unwrap();
let update_msg = Message::load_from_db(&bob, update_msg_id).await?;
bob.send_webxdc_status_update(bob_instance.id, r#"{"payload":43}"#, "descr")
.await?;
bob.flush_status_updates().await?;
let sent3 = bob.pop_sent_msg().await;
let update_msg = Message::load_from_db(&bob, sent3.sender_msg_id).await?;
assert!(!update_msg.get_showpadlock());
Ok(())