webxdc update events

This commit is contained in:
Sebastian Klähn
2022-09-12 09:03:36 +02:00
committed by Septias
parent 0b53c35523
commit 8d62e5defb
10 changed files with 387 additions and 5 deletions

View File

@@ -318,4 +318,12 @@ pub enum EventType {
WebxdcInstanceDeleted {
msg_id: MsgId,
},
WebxdcBusyUpdating {
msg_id: MsgId,
},
WebxdcUpToDate {
msg_id: MsgId,
},
}

View File

@@ -377,7 +377,7 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
if !duration_until_can_send.is_zero() {
info!(
ctx,
"smtp got rate limited, waiting for {} until can send again",
"smtp got rate limited, delaying next try by {}",
duration_to_str(duration_until_can_send)
);
tokio::time::timeout(duration_until_can_send, async {

View File

@@ -21,6 +21,7 @@ use crate::oauth2::get_oauth2_access_token;
use crate::provider::Socket;
use crate::socks::Socks5Config;
use crate::sql;
use crate::webxdc::get_busy_webxdc_instances;
use crate::{context::Context, scheduler::connectivity::ConnectivityStore};
/// SMTP write and read timeout in seconds.
@@ -498,7 +499,15 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
let ratelimited = if context.ratelimit.read().await.can_send() {
// add status updates and sync messages to end of sending queue
let update_needed = get_busy_webxdc_instances(&context.sql).await?;
context.flush_status_updates().await?;
let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?;
for msg_id in update_needed.difference(&update_needed_after_sending) {
context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id })
}
context.send_sync_msg().await?;
false
} else {

View File

@@ -7,6 +7,7 @@ use std::path::PathBuf;
use std::time::Duration;
use anyhow::{bail, Context as _, Result};
use rusqlite::types::FromSql;
use rusqlite::{config::DbConfig, Connection, OpenFlags};
use tokio::sync::RwLock;
@@ -363,6 +364,25 @@ impl Sql {
})
}
/// Returns unique values of a `column` in `table`
pub async fn distinct<T: FromSql + Default>(
&self,
table: &str,
column: &str,
) -> Result<Vec<T>> {
let conn = self.get_conn().await?;
let rows: Result<Vec<T>> = tokio::task::block_in_place(move || {
let mut stmt = conn.prepare(&format!("SELECT DISTINCT {column} FROM {table}"))?;
let rows = stmt
.query([])?
.mapped(|r| r.get(0))
.map(|a| a.unwrap_or_default())
.collect();
Ok(rows)
});
rows
}
/// Prepares and executes the statement and maps a function over the resulting rows.
/// Then executes the second function over the returned iterator and returns the
/// result of that function.

View File

@@ -1,5 +1,6 @@
//! # Handle webxdc messages.
use std::collections::HashSet;
use std::convert::TryFrom;
use std::path::Path;
@@ -20,6 +21,7 @@ use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::scheduler::InterruptInfo;
use crate::sql::Sql;
use crate::tools::{create_smeared_timestamp, get_abs_path};
use crate::{chat, EventType};
@@ -379,6 +381,10 @@ impl Context {
)
.await?;
self.emit_event(EventType::WebxdcBusyUpdating {
msg_id: instance.id,
});
if send_now {
self.sql.insert(
"INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?)
@@ -392,7 +398,6 @@ impl Context {
}
/// Pops one record of queued webxdc status updates.
/// This function exists to make the sqlite statement testable.
async fn pop_smtp_status_update(
&self,
) -> Result<Option<(MsgId, StatusUpdateSerial, StatusUpdateSerial, String)>> {
@@ -416,12 +421,15 @@ impl Context {
}
/// Attempts to send queued webxdc status updates.
pub(crate) async fn flush_status_updates(&self) -> Result<()> {
///
/// Returns true if there are more status updates to send, but rate limiter does not
/// allow to send them. Returns false if there are no more status updates to send.
pub(crate) async fn flush_status_updates(&self) -> Result<bool> {
loop {
let (instance_id, first_serial, last_serial, descr) =
match self.pop_smtp_status_update().await? {
Some(res) => res,
None => return Ok(()),
None => return Ok(false),
};
if let Some(json) = self
@@ -743,6 +751,15 @@ impl Message {
}
}
/// Returns a hashset of all webxdc instaces which still have updates to send
pub(crate) async fn get_busy_webxdc_instances(sql: &Sql) -> Result<HashSet<MsgId>> {
Ok(sql
.distinct("smtp_status_updates", "msg_id")
.await?
.into_iter()
.collect())
}
#[cfg(test)]
mod tests {
use crate::chat::{