diff --git a/src/smtp.rs b/src/smtp.rs index 5b3aafd9a..de20f90e1 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -21,7 +21,7 @@ use crate::oauth2::get_oauth2_access_token; use crate::provider::Socket; use crate::socks::Socks5Config; use crate::sql; -use crate::webxdc::get_busy_webxdc_instances; + use crate::{context::Context, scheduler::connectivity::ConnectivityStore}; /// SMTP write and read timeout in seconds. @@ -499,15 +499,7 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> { pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> { let ratelimited = if context.ratelimit.read().await.can_send() { // add status updates and sync messages to end of sending queue - - let update_needed = get_busy_webxdc_instances(&context.sql).await?; context.flush_status_updates().await?; - let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?; - - for msg_id in update_needed.difference(&update_needed_after_sending) { - context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id }) - } - context.send_sync_msg().await?; false } else { diff --git a/src/webxdc.rs b/src/webxdc.rs index 8182e1974..f69083c54 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -425,11 +425,13 @@ impl Context { /// Returns true if there are more status updates to send, but rate limiter does not /// allow to send them. Returns false if there are no more status updates to send. pub(crate) async fn flush_status_updates(&self) -> Result { + let update_needed = get_busy_webxdc_instances(&self.sql).await?; + loop { let (instance_id, first_serial, last_serial, descr) = match self.pop_smtp_status_update().await? { Some(res) => res, - None => return Ok(false), + None => break, }; if let Some(json) = self @@ -453,6 +455,11 @@ impl Context { chat::send_msg(self, instance.chat_id, &mut status_update).await?; } } + let update_needed_after_sending = get_busy_webxdc_instances(&self.sql).await?; + for msg_id in update_needed.difference(&update_needed_after_sending) { + self.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id }) + } + Ok(false) } pub(crate) fn build_status_update_part(&self, json: &str) -> PartBuilder { @@ -2408,4 +2415,31 @@ sth_for_the = "future""# .await; Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_webxdc_update_events() -> Result<()> { + let alice = TestContext::new_alice().await; + let chat_id = create_group_chat(&alice, ProtectionStatus::Unprotected, "foo").await?; + let instance = send_webxdc_instance(&alice, chat_id).await?; + alice + .send_webxdc_status_update( + instance.id, + r#"{"payload":7,"info": "i","summary":"s"}"#, + "", + ) + .await?; + alice + .evtracker + .get_matching(|evt| matches!(evt, EventType::WebxdcBusyUpdating { .. })) + .await; + + alice.flush_status_updates().await?; + + println!("received event"); + alice + .evtracker + .get_matching(|evt| matches!(evt, EventType::WebxdcUpToDate { .. })) + .await; + Ok(()) + } }