method to get busy webxdcs

This commit is contained in:
Sebastian Klähn
2022-09-11 20:41:45 +02:00
parent 6877f16b63
commit 5a79815a13
3 changed files with 30 additions and 6 deletions

View File

@@ -501,9 +501,9 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp)
let ratelimited = if context.ratelimit.read().await.can_send() { let ratelimited = if context.ratelimit.read().await.can_send() {
// add status updates and sync messages to end of sending queue // add status updates and sync messages to end of sending queue
let update_needed = get_busy_webxdc_instances(); let update_needed = get_busy_webxdc_instances(&context.sql).await?;
context.flush_status_updates().await?; context.flush_status_updates().await?;
let update_needed_after_sending = get_busy_webxdc_instances(); let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?;
for msg_id in update_needed.difference(&update_needed_after_sending) { for msg_id in update_needed.difference(&update_needed_after_sending) {
context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id }) context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id })

View File

@@ -7,6 +7,7 @@ use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use rusqlite::types::FromSql;
use rusqlite::{config::DbConfig, Connection, OpenFlags}; use rusqlite::{config::DbConfig, Connection, OpenFlags};
use tokio::sync::RwLock; use tokio::sync::RwLock;
@@ -363,6 +364,25 @@ impl Sql {
}) })
} }
/// Returns unique values of a `column` in `table`
pub async fn distinct<T: FromSql + Default>(
&self,
table: &str,
column: &str,
) -> Result<Vec<T>> {
let conn = self.get_conn().await?;
let rows: Result<Vec<T>> = tokio::task::block_in_place(move || {
let mut stmt = conn.prepare(&format!("SELECT DISTINCT {column} FROM {table}"))?;
let rows = stmt
.query([])?
.mapped(|r| r.get(0))
.map(|a| a.unwrap_or_default())
.collect();
Ok(rows)
});
rows
}
/// Prepares and executes the statement and maps a function over the resulting rows. /// Prepares and executes the statement and maps a function over the resulting rows.
/// Then executes the second function over the returned iterator and returns the /// Then executes the second function over the returned iterator and returns the
/// result of that function. /// result of that function.

View File

@@ -21,6 +21,7 @@ use crate::mimeparser::SystemMessage;
use crate::param::Param; use crate::param::Param;
use crate::param::Params; use crate::param::Params;
use crate::scheduler::InterruptInfo; use crate::scheduler::InterruptInfo;
use crate::sql::Sql;
use crate::tools::{create_smeared_timestamp, get_abs_path}; use crate::tools::{create_smeared_timestamp, get_abs_path};
use crate::{chat, EventType}; use crate::{chat, EventType};
@@ -749,9 +750,12 @@ impl Message {
} }
/// Returns a hashset of all webxdc instaces which still have updates to send /// Returns a hashset of all webxdc instaces which still have updates to send
pub(crate) fn get_busy_webxdc_instances() -> HashSet<MsgId> { pub(crate) async fn get_busy_webxdc_instances(sql: &Sql) -> Result<HashSet<MsgId>> {
// TODO: add sql statement to find that out Ok(sql
unimplemented!() .distinct("smtp_status_updates", "msg_id")
.await?
.into_iter()
.collect())
} }
#[cfg(test)] #[cfg(test)]