diff --git a/CHANGELOG.md b/CHANGELOG.md index cf38ef76d..db495e48c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -285,6 +285,8 @@ this affects `dc_get_chat_contacts()`, `dc_get_contacts()` and `dc_get_blocked_contacts()` #3562 - add `internet_access` flag to `dc_msg_get_webxdc_info()` #3516 - `DC_EVENT_WEBXDC_INSTANCE_DELETED` is emitted when a message containing a webxdc gets deleted #3592 +- `DC_EVENT_WEBXDC_BUSY_UPDATING` is emitted when a new update has to be sent by an webxdc #3320 +- `DC_EVENT_WEBXDC_UP_TO_DATE` is emitted when a webxdc has sent all updates #3320 ### Fixes - do not emit notifications for blocked chats #3557 diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 95e4cf45f..b45bfcc4e 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -5837,10 +5837,24 @@ void dc_event_unref(dc_event_t* event); * * @param data1 (int) msg_id */ - #define DC_EVENT_WEBXDC_INSTANCE_DELETED 2121 +/** + * Webxdc has some updates that need to be sent + * + * @param data1 (int) msg_id + */ +#define DC_EVENT_WEBXDC_BUSY_UPDATING 2122 + + +/** + * Webxdc has finished sending updates + * + * @param data1 (int) msg_id + */ +#define DC_EVENT_WEBXDC_UP_TO_DATE 2123 + /** * @} */ diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index f07e4d53b..6891bea5f 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -521,6 +521,8 @@ pub unsafe extern "C" fn dc_event_get_id(event: *mut dc_event_t) -> libc::c_int EventType::SelfavatarChanged => 2110, EventType::WebxdcStatusUpdate { .. } => 2120, EventType::WebxdcInstanceDeleted { .. } => 2121, + EventType::WebxdcBusyUpdating { .. } => 2022, + EventType::WebxdcUpToDate { .. } => 2023, } } @@ -570,6 +572,8 @@ pub unsafe extern "C" fn dc_event_get_data1_int(event: *mut dc_event_t) -> libc: } EventType::WebxdcStatusUpdate { msg_id, .. } => msg_id.to_u32() as libc::c_int, EventType::WebxdcInstanceDeleted { msg_id, .. } => msg_id.to_u32() as libc::c_int, + EventType::WebxdcBusyUpdating { msg_id } => msg_id.to_u32() as libc::c_int, + EventType::WebxdcUpToDate { msg_id } => msg_id.to_u32() as libc::c_int, } } @@ -603,6 +607,8 @@ pub unsafe extern "C" fn dc_event_get_data2_int(event: *mut dc_event_t) -> libc: | EventType::ConnectivityChanged | EventType::WebxdcInstanceDeleted { .. } | EventType::IncomingMsgBunch { .. } + | EventType::WebxdcBusyUpdating { .. } + | EventType::WebxdcUpToDate { .. } | EventType::SelfavatarChanged => 0, EventType::ChatModified(_) => 0, EventType::MsgsChanged { msg_id, .. } @@ -662,6 +668,8 @@ pub unsafe extern "C" fn dc_event_get_data2_str(event: *mut dc_event_t) -> *mut | EventType::SelfavatarChanged | EventType::WebxdcStatusUpdate { .. } | EventType::WebxdcInstanceDeleted { .. } + | EventType::WebxdcBusyUpdating { .. } + | EventType::WebxdcUpToDate { .. } | EventType::ChatEphemeralTimerModified { .. } => ptr::null_mut(), EventType::ConfigureProgress { comment, .. } => { if let Some(comment) = comment { diff --git a/deltachat-jsonrpc/src/api/events.rs b/deltachat-jsonrpc/src/api/events.rs index 70dadb52e..7d2836ddd 100644 --- a/deltachat-jsonrpc/src/api/events.rs +++ b/deltachat-jsonrpc/src/api/events.rs @@ -281,6 +281,8 @@ pub enum JSONRPCEventType { WebxdcInstanceDeleted { msg_id: u32, }, + WebxdcBusyUpdating, + WebxdcUpToDate, } impl From for JSONRPCEventType { @@ -381,6 +383,8 @@ impl From for JSONRPCEventType { EventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted { msg_id: msg_id.to_u32(), }, + EventType::WebxdcBusyUpdating { .. } => WebxdcBusyUpdating, + EventType::WebxdcUpToDate { .. } => WebxdcUpToDate, } } } diff --git a/diff b/diff new file mode 100644 index 000000000..61151b38d --- /dev/null +++ b/diff @@ -0,0 +1,300 @@ +diff --git a/CHANGELOG.md b/CHANGELOG.md +index ea6287f5..a0eaf04c 100644 +--- a/CHANGELOG.md ++++ b/CHANGELOG.md +@@ -49,6 +49,8 @@ ### Changes + this affects `dc_get_chat_contacts()`, `dc_get_contacts()` and `dc_get_blocked_contacts()` #3562 + - add `internet_access` flag to `dc_msg_get_webxdc_info()` #3516 + - `DC_EVENT_WEBXDC_INSTANCE_DELETED` is emitted when a message containing a webxdc gets deleted #3105 ++- `DC_EVENT_WEBXDC_BUSY_UPDATING` is emitted when a new update has to be sent by an webxdc #3320 ++- `DC_EVENT_WEBXDC_UP_TO_DATE` is emitted when a webxdc has sent all updates #3320 + + ### Fixes + - do not emit notifications for blocked chats #3557 +diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h +index 28d912cc..6df330b1 100644 +--- a/deltachat-ffi/deltachat.h ++++ b/deltachat-ffi/deltachat.h +@@ -5731,10 +5731,24 @@ void dc_event_unref(dc_event_t* event); + * + * @param data1 (int) msg_id + */ +- + #define DC_EVENT_WEBXDC_INSTANCE_DELETED 2121 + + ++/** ++ * Webxdc has some updates that need to be sent ++ * ++ * @param data1 (int) msg_id ++ */ ++#define DC_EVENT_WEBXDC_BUSY_UPDATING 2122 ++ ++ ++/** ++ * Webxdc has finished sending updates ++ * ++ * @param data1 (int) msg_id ++ */ ++#define DC_EVENT_WEBXDC_UP_TO_DATE 2123 ++ + /** + * @} + */ +diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs +index 2f872d6e..2e7fd40d 100644 +--- a/deltachat-ffi/src/lib.rs ++++ b/deltachat-ffi/src/lib.rs +@@ -505,6 +505,8 @@ fn render_info( + EventType::SelfavatarChanged => 2110, + EventType::WebxdcStatusUpdate { .. } => 2120, + EventType::WebxdcInstanceDeleted { .. } => 2121, ++ EventType::WebxdcBusyUpdating { .. } => 2022, ++ EventType::WebxdcUpToDate { .. } => 2023, + } + } + +@@ -552,6 +554,8 @@ fn render_info( + } + EventType::WebxdcStatusUpdate { msg_id, .. } => msg_id.to_u32() as libc::c_int, + EventType::WebxdcInstanceDeleted { msg_id, .. } => msg_id.to_u32() as libc::c_int, ++ EventType::WebxdcBusyUpdating { msg_id } => msg_id.to_u32() as libc::c_int, ++ EventType::WebxdcUpToDate { msg_id } => msg_id.to_u32() as libc::c_int, + } + } + +@@ -584,6 +588,8 @@ fn render_info( + | EventType::MsgsNoticed(_) + | EventType::ConnectivityChanged + | EventType::WebxdcInstanceDeleted { .. } ++ | EventType::WebxdcBusyUpdating { .. } ++ | EventType::WebxdcUpToDate { .. } + | EventType::SelfavatarChanged => 0, + EventType::ChatModified(_) => 0, + EventType::MsgsChanged { msg_id, .. } +@@ -641,6 +647,8 @@ fn render_info( + | EventType::SelfavatarChanged + | EventType::WebxdcStatusUpdate { .. } + | EventType::WebxdcInstanceDeleted { .. } ++ | EventType::WebxdcBusyUpdating { .. } ++ | EventType::WebxdcUpToDate { .. } + | EventType::ChatEphemeralTimerModified { .. } => ptr::null_mut(), + EventType::ConfigureProgress { comment, .. } => { + if let Some(comment) = comment { +diff --git a/deltachat-jsonrpc/src/api/events.rs b/deltachat-jsonrpc/src/api/events.rs +index 61e1162c..a4d052c2 100644 +--- a/deltachat-jsonrpc/src/api/events.rs ++++ b/deltachat-jsonrpc/src/api/events.rs +@@ -61,6 +61,8 @@ pub fn event_to_json_rpc_notification(event: Event) -> Value { + status_update_serial, + } => (json!(msg_id), json!(status_update_serial)), + EventType::WebxdcInstanceDeleted { msg_id } => (json!(msg_id), Value::Null), ++ EventType::WebxdcBusyUpdating { msg_id } => (json!(msg_id), Value::Null), ++ EventType::WebxdcUpToDate { msg_id } => (json!(msg_id), Value::Null), + }; + + let id: EventTypeName = event.typ.into(); +@@ -103,7 +105,9 @@ pub enum EventTypeName { + ConnectivityChanged, + SelfavatarChanged, + WebxdcStatusUpdate, +- WebXdInstanceDeleted, ++ WebxdcInstanceDeleted, ++ WebxdcBusyUpdating, ++ WebxdcUpToDate, + } + + impl From for EventTypeName { +@@ -139,7 +143,9 @@ fn from(event: EventType) -> Self { + EventType::ConnectivityChanged => ConnectivityChanged, + EventType::SelfavatarChanged => SelfavatarChanged, + EventType::WebxdcStatusUpdate { .. } => WebxdcStatusUpdate, +- EventType::WebxdcInstanceDeleted { .. } => WebXdInstanceDeleted, ++ EventType::WebxdcInstanceDeleted { .. } => WebxdcInstanceDeleted, ++ EventType::WebxdcBusyUpdating { .. } => WebxdcBusyUpdating, ++ EventType::WebxdcUpToDate { .. } => WebxdcUpToDate, + } + } + } +diff --git a/deltachat-jsonrpc/typescript/generated/types.ts b/deltachat-jsonrpc/typescript/generated/types.ts +index 23c90f0a..7ccb9e2d 100644 +--- a/deltachat-jsonrpc/typescript/generated/types.ts ++++ b/deltachat-jsonrpc/typescript/generated/types.ts +@@ -135,7 +135,5 @@ export type WebxdcMessageInfo={ + * True if full internet access should be granted to the app. + */ + "internetAccess":boolean;}; +-export type DownloadState=("Done"|"Available"|"Failure"|"InProgress"); +-export type Message={"id":U32;"chatId":U32;"fromId":U32;"quote":(MessageQuote|null);"parentId":(U32|null);"text":(string|null);"hasLocation":boolean;"hasHtml":boolean;"viewType":Viewtype;"state":U32;"timestamp":I64;"sortTimestamp":I64;"receivedTimestamp":I64;"hasDeviatingTimestamp":boolean;"subject":string;"showPadlock":boolean;"isSetupmessage":boolean;"isInfo":boolean;"isForwarded":boolean;"duration":I32;"dimensionsHeight":I32;"dimensionsWidth":I32;"videochatType":(U32|null);"videochatUrl":(string|null);"overrideSenderName":(string|null);"sender":Contact;"setupCodeBegin":(string|null);"file":(string|null);"fileMime":(string|null);"fileBytes":U64;"fileName":(string|null);"webxdcInfo":(WebxdcMessageInfo|null);"downloadState":DownloadState;}; +-export type F64=number; +-export type __AllTyps=[string,boolean,Record,U32,U32,null,(U32)[],U32,null,(U32|null),(Account)[],U32,Account,U32,string,(ProviderInfo|null),U32,boolean,U32,Record,U32,string,(string|null),null,U32,Record,null,U32,string,null,U32,string,Qr,U32,string,(string|null),U32,(string)[],Record,U32,null,U32,null,U32,(U32)[],U32,U32,Usize,U32,string,U32,U32,string,null,U32,(U32|null),(string|null),(U32|null),(ChatListEntry)[],U32,(ChatListEntry)[],Record,U32,U32,FullChat,U32,U32,BasicChat,U32,U32,null,U32,U32,null,U32,U32,null,U32,U32,string,U32,(U32|null),[string,string],U32,U32,null,U32,U32,U32,null,U32,U32,U32,null,U32,string,string,U32,U32,U32,null,U32,U32,(U32|null),U32,U32,MuteDuration,null,U32,U32,boolean,U32,(U32)[],null,U32,U32,U32,(U32)[],U32,U32,Message,U32,(U32)[],Record,U32,(U32)[],null,U32,U32,string,U32,U32,Contact,U32,string,(string|null),U32,U32,U32,U32,U32,U32,null,U32,U32,null,U32,(Contact)[],U32,U32,(string|null),(U32)[],U32,U32,(string|null),(Contact)[],U32,(U32)[],Record,U32,U32,string,U32,(U32|null),Viewtype,(Viewtype|null),(Viewtype|null),(U32)[],null,U32,U32,U32,string,U32,U32,string,string,null,U32,U32,U32,string,U32,U32,WebxdcMessageInfo,U32,(U32)[],U32,null,U32,U32,null,U32,U32,(Message|null),U32,string,U32,U32,U32,U32,(string|null),(string|null),([F64,F64]|null),(U32|null),[U32,Message],U32,U32,(string|null),(string|null),(U32|null),null]; ++export type Message={"id":U32;"chatId":U32;"fromId":U32;"quotedText":(string|null);"quotedMessageId":(U32|null);"text":(string|null);"hasLocation":boolean;"hasHtml":boolean;"viewType":Viewtype;"state":U32;"timestamp":I64;"sortTimestamp":I64;"receivedTimestamp":I64;"hasDeviatingTimestamp":boolean;"subject":string;"showPadlock":boolean;"isSetupmessage":boolean;"isInfo":boolean;"isForwarded":boolean;"duration":I32;"dimensionsHeight":I32;"dimensionsWidth":I32;"videochatType":(U32|null);"videochatUrl":(string|null);"overrideSenderName":(string|null);"sender":Contact;"setupCodeBegin":(string|null);"file":(string|null);"fileMime":(string|null);"fileBytes":U64;"fileName":(string|null);"webxdcInfo":(WebxdcMessageInfo|null);}; ++export type __AllTyps=[string,boolean,Record,U32,U32,null,(U32)[],U32,null,(U32|null),(Account)[],U32,Account,U32,string,(ProviderInfo|null),U32,boolean,U32,Record,U32,string,(string|null),null,U32,Record,null,U32,string,null,U32,string,Qr,U32,string,(string|null),U32,(string)[],Record,U32,null,U32,null,U32,(U32)[],U32,U32,Usize,U32,string,U32,U32,string,null,U32,(U32|null),(string|null),(U32|null),(ChatListEntry)[],U32,(ChatListEntry)[],Record,U32,U32,FullChat,U32,U32,null,U32,U32,null,U32,U32,null,U32,U32,string,U32,(U32|null),[string,string],U32,U32,null,U32,U32,U32,null,U32,U32,U32,null,U32,string,string,U32,U32,U32,U32,(U32)[],U32,U32,Message,U32,(U32)[],Record,U32,(U32)[],null,U32,U32,string,U32,U32,Contact,U32,string,(string|null),U32,U32,U32,U32,U32,U32,null,U32,U32,null,U32,(Contact)[],U32,U32,(string|null),(U32)[],U32,U32,(string|null),(Contact)[],U32,(U32)[],Record,U32,(U32|null),Viewtype,(Viewtype|null),(Viewtype|null),(U32)[],U32,U32,string,string,null,U32,U32,U32,string,U32,U32,WebxdcMessageInfo,U32,string,U32,U32]; +diff --git a/src/events.rs b/src/events.rs +index 29fb039b..3ed103b9 100644 +--- a/src/events.rs ++++ b/src/events.rs +@@ -307,4 +307,12 @@ pub enum EventType { + WebxdcInstanceDeleted { + msg_id: MsgId, + }, ++ ++ WebxdcBusyUpdating { ++ msg_id: MsgId, ++ }, ++ ++ WebxdcUpToDate { ++ msg_id: MsgId, ++ }, + } +diff --git a/src/scheduler.rs b/src/scheduler.rs +index 069e5dd8..5e5761a8 100644 +--- a/src/scheduler.rs ++++ b/src/scheduler.rs +@@ -332,7 +332,7 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect + if !duration_until_can_send.is_zero() { + info!( + ctx, +- "smtp got rate limited, waiting for {} until can send again", ++ "smtp got rate limited, delaying next try by {}", + duration_to_str(duration_until_can_send) + ); + tokio::time::timeout(duration_until_can_send, async { +diff --git a/src/smtp.rs b/src/smtp.rs +index a3affdfc..55bb6170 100644 +--- a/src/smtp.rs ++++ b/src/smtp.rs +@@ -22,6 +22,7 @@ + use crate::oauth2::get_oauth2_access_token; + use crate::provider::Socket; + use crate::sql; ++use crate::webxdc::get_busy_webxdc_instances; + use crate::{context::Context, scheduler::connectivity::ConnectivityStore}; + + /// SMTP write and read timeout in seconds. +@@ -499,7 +500,15 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> { + pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> { + let ratelimited = if context.ratelimit.read().await.can_send() { + // add status updates and sync messages to end of sending queue ++ ++ let update_needed = get_busy_webxdc_instances(&context.sql).await?; + context.flush_status_updates().await?; ++ let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?; ++ ++ for msg_id in update_needed.difference(&update_needed_after_sending) { ++ context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id }) ++ } ++ + context.send_sync_msg().await?; + false + } else { +diff --git a/src/sql.rs b/src/sql.rs +index 0a56ada1..08d85922 100644 +--- a/src/sql.rs ++++ b/src/sql.rs +@@ -7,6 +7,7 @@ + use std::time::Duration; + + use anyhow::{bail, Context as _, Result}; ++use rusqlite::types::FromSql; + use rusqlite::{config::DbConfig, Connection, OpenFlags}; + use tokio::sync::RwLock; + +@@ -363,6 +364,25 @@ pub async fn insert(&self, query: &str, params: impl rusqlite::Params) -> Result + }) + } + ++ /// Returns unique values of a `column` in `table` ++ pub async fn distinct( ++ &self, ++ table: &str, ++ column: &str, ++ ) -> Result> { ++ let conn = self.get_conn().await?; ++ let rows: Result> = tokio::task::block_in_place(move || { ++ let mut stmt = conn.prepare(&format!("SELECT DISTINCT {column} FROM {table}"))?; ++ let rows = stmt ++ .query([])? ++ .mapped(|r| r.get(0)) ++ .map(|a| a.unwrap_or_default()) ++ .collect(); ++ Ok(rows) ++ }); ++ rows ++ } ++ + /// Prepares and executes the statement and maps a function over the resulting rows. + /// Then executes the second function over the returned iterator and returns the + /// result of that function. +diff --git a/src/webxdc.rs b/src/webxdc.rs +index 3d21d0f4..afc93c67 100644 +--- a/src/webxdc.rs ++++ b/src/webxdc.rs +@@ -1,5 +1,6 @@ + //! # Handle webxdc messages. + ++use std::collections::HashSet; + use std::convert::TryFrom; + use std::path::Path; + +@@ -20,6 +21,7 @@ + use crate::param::Param; + use crate::param::Params; + use crate::scheduler::InterruptInfo; ++use crate::sql::Sql; + use crate::tools::{create_smeared_timestamp, get_abs_path}; + use crate::{chat, EventType}; + +@@ -377,6 +379,10 @@ pub async fn send_webxdc_status_update( + ) + .await?; + ++ self.emit_event(EventType::WebxdcBusyUpdating { ++ msg_id: instance.id, ++ }); ++ + if send_now { + self.sql.insert( + "INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?) +@@ -390,7 +396,6 @@ pub async fn send_webxdc_status_update( + } + + /// Pops one record of queued webxdc status updates. +- /// This function exists to make the sqlite statement testable. + async fn pop_smtp_status_update( + &self, + ) -> Result> { +@@ -414,12 +419,15 @@ async fn pop_smtp_status_update( + } + + /// Attempts to send queued webxdc status updates. +- pub(crate) async fn flush_status_updates(&self) -> Result<()> { ++ /// ++ /// Returns true if there are more status updates to send, but rate limiter does not ++ /// allow to send them. Returns false if there are no more status updates to send. ++ pub(crate) async fn flush_status_updates(&self) -> Result { + loop { + let (instance_id, first_serial, last_serial, descr) = + match self.pop_smtp_status_update().await? { + Some(res) => res, +- None => return Ok(()), ++ None => return Ok(false), + }; + + if let Some(json) = self +@@ -741,6 +749,15 @@ pub async fn get_webxdc_info(&self, context: &Context) -> Result { + } + } + ++/// Returns a hashset of all webxdc instaces which still have updates to send ++pub(crate) async fn get_busy_webxdc_instances(sql: &Sql) -> Result> { ++ Ok(sql ++ .distinct("smtp_status_updates", "msg_id") ++ .await? ++ .into_iter() ++ .collect()) ++} ++ + #[cfg(test)] + mod tests { + use crate::chat::{ diff --git a/src/events.rs b/src/events.rs index d0033c4ae..af3437d70 100644 --- a/src/events.rs +++ b/src/events.rs @@ -318,4 +318,12 @@ pub enum EventType { WebxdcInstanceDeleted { msg_id: MsgId, }, + + WebxdcBusyUpdating { + msg_id: MsgId, + }, + + WebxdcUpToDate { + msg_id: MsgId, + }, } diff --git a/src/scheduler.rs b/src/scheduler.rs index 81d5e7be8..7a3dcc5ff 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -377,7 +377,7 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect if !duration_until_can_send.is_zero() { info!( ctx, - "smtp got rate limited, waiting for {} until can send again", + "smtp got rate limited, delaying next try by {}", duration_to_str(duration_until_can_send) ); tokio::time::timeout(duration_until_can_send, async { diff --git a/src/smtp.rs b/src/smtp.rs index b3669e690..5b3aafd9a 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -21,6 +21,7 @@ use crate::oauth2::get_oauth2_access_token; use crate::provider::Socket; use crate::socks::Socks5Config; use crate::sql; +use crate::webxdc::get_busy_webxdc_instances; use crate::{context::Context, scheduler::connectivity::ConnectivityStore}; /// SMTP write and read timeout in seconds. @@ -498,7 +499,15 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> { pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> { let ratelimited = if context.ratelimit.read().await.can_send() { // add status updates and sync messages to end of sending queue + + let update_needed = get_busy_webxdc_instances(&context.sql).await?; context.flush_status_updates().await?; + let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?; + + for msg_id in update_needed.difference(&update_needed_after_sending) { + context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id }) + } + context.send_sync_msg().await?; false } else { diff --git a/src/sql.rs b/src/sql.rs index 4dacb5daf..770369e96 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::time::Duration; use anyhow::{bail, Context as _, Result}; +use rusqlite::types::FromSql; use rusqlite::{config::DbConfig, Connection, OpenFlags}; use tokio::sync::RwLock; @@ -363,6 +364,25 @@ impl Sql { }) } + /// Returns unique values of a `column` in `table` + pub async fn distinct( + &self, + table: &str, + column: &str, + ) -> Result> { + let conn = self.get_conn().await?; + let rows: Result> = tokio::task::block_in_place(move || { + let mut stmt = conn.prepare(&format!("SELECT DISTINCT {column} FROM {table}"))?; + let rows = stmt + .query([])? + .mapped(|r| r.get(0)) + .map(|a| a.unwrap_or_default()) + .collect(); + Ok(rows) + }); + rows + } + /// Prepares and executes the statement and maps a function over the resulting rows. /// Then executes the second function over the returned iterator and returns the /// result of that function. diff --git a/src/webxdc.rs b/src/webxdc.rs index a4f185900..8182e1974 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -1,5 +1,6 @@ //! # Handle webxdc messages. +use std::collections::HashSet; use std::convert::TryFrom; use std::path::Path; @@ -20,6 +21,7 @@ use crate::mimeparser::SystemMessage; use crate::param::Param; use crate::param::Params; use crate::scheduler::InterruptInfo; +use crate::sql::Sql; use crate::tools::{create_smeared_timestamp, get_abs_path}; use crate::{chat, EventType}; @@ -379,6 +381,10 @@ impl Context { ) .await?; + self.emit_event(EventType::WebxdcBusyUpdating { + msg_id: instance.id, + }); + if send_now { self.sql.insert( "INSERT INTO smtp_status_updates (msg_id, first_serial, last_serial, descr) VALUES(?, ?, ?, ?) @@ -392,7 +398,6 @@ impl Context { } /// Pops one record of queued webxdc status updates. - /// This function exists to make the sqlite statement testable. async fn pop_smtp_status_update( &self, ) -> Result> { @@ -416,12 +421,15 @@ impl Context { } /// Attempts to send queued webxdc status updates. - pub(crate) async fn flush_status_updates(&self) -> Result<()> { + /// + /// Returns true if there are more status updates to send, but rate limiter does not + /// allow to send them. Returns false if there are no more status updates to send. + pub(crate) async fn flush_status_updates(&self) -> Result { loop { let (instance_id, first_serial, last_serial, descr) = match self.pop_smtp_status_update().await? { Some(res) => res, - None => return Ok(()), + None => return Ok(false), }; if let Some(json) = self @@ -743,6 +751,15 @@ impl Message { } } +/// Returns a hashset of all webxdc instaces which still have updates to send +pub(crate) async fn get_busy_webxdc_instances(sql: &Sql) -> Result> { + Ok(sql + .distinct("smtp_status_updates", "msg_id") + .await? + .into_iter() + .collect()) +} + #[cfg(test)] mod tests { use crate::chat::{