From 5a79815a13ee19e2ae06cd84c19e88056895b7e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kl=C3=A4hn?= Date: Sun, 11 Sep 2022 20:41:45 +0200 Subject: [PATCH] method to get busy webxdcs --- src/smtp.rs | 6 +++--- src/sql.rs | 20 ++++++++++++++++++++ src/webxdc.rs | 10 +++++++--- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/smtp.rs b/src/smtp.rs index ef1437c7e..55bb61700 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -501,10 +501,10 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) let ratelimited = if context.ratelimit.read().await.can_send() { // add status updates and sync messages to end of sending queue - let update_needed = get_busy_webxdc_instances(); + let update_needed = get_busy_webxdc_instances(&context.sql).await?; context.flush_status_updates().await?; - let update_needed_after_sending = get_busy_webxdc_instances(); - + let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?; + for msg_id in update_needed.difference(&update_needed_after_sending) { context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id }) } diff --git a/src/sql.rs b/src/sql.rs index 0a56ada1f..08d85922e 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::time::Duration; use anyhow::{bail, Context as _, Result}; +use rusqlite::types::FromSql; use rusqlite::{config::DbConfig, Connection, OpenFlags}; use tokio::sync::RwLock; @@ -363,6 +364,25 @@ impl Sql { }) } + /// Returns unique values of a `column` in `table` + pub async fn distinct( + &self, + table: &str, + column: &str, + ) -> Result> { + let conn = self.get_conn().await?; + let rows: Result> = tokio::task::block_in_place(move || { + let mut stmt = conn.prepare(&format!("SELECT DISTINCT {column} FROM {table}"))?; + let rows = stmt + .query([])? + .mapped(|r| r.get(0)) + .map(|a| a.unwrap_or_default()) + .collect(); + Ok(rows) + }); + rows + } + /// Prepares and executes the statement and maps a function over the resulting rows. /// Then executes the second function over the returned iterator and returns the /// result of that function. diff --git a/src/webxdc.rs b/src/webxdc.rs index 6777c51f6..afc93c67f 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -21,6 +21,7 @@ use crate::mimeparser::SystemMessage; use crate::param::Param; use crate::param::Params; use crate::scheduler::InterruptInfo; +use crate::sql::Sql; use crate::tools::{create_smeared_timestamp, get_abs_path}; use crate::{chat, EventType}; @@ -749,9 +750,12 @@ impl Message { } /// Returns a hashset of all webxdc instaces which still have updates to send -pub(crate) fn get_busy_webxdc_instances() -> HashSet { - // TODO: add sql statement to find that out - unimplemented!() +pub(crate) async fn get_busy_webxdc_instances(sql: &Sql) -> Result> { + Ok(sql + .distinct("smtp_status_updates", "msg_id") + .await? + .into_iter() + .collect()) } #[cfg(test)]