mirror of
https://github.com/chatmail/core.git
synced 2026-05-04 05:46:29 +03:00
add test
This commit is contained in:
10
src/smtp.rs
10
src/smtp.rs
@@ -21,7 +21,7 @@ use crate::oauth2::get_oauth2_access_token;
|
||||
use crate::provider::Socket;
|
||||
use crate::socks::Socks5Config;
|
||||
use crate::sql;
|
||||
use crate::webxdc::get_busy_webxdc_instances;
|
||||
|
||||
use crate::{context::Context, scheduler::connectivity::ConnectivityStore};
|
||||
|
||||
/// SMTP write and read timeout in seconds.
|
||||
@@ -499,15 +499,7 @@ async fn send_mdns(context: &Context, connection: &mut Smtp) -> Result<()> {
|
||||
pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) -> Result<()> {
|
||||
let ratelimited = if context.ratelimit.read().await.can_send() {
|
||||
// add status updates and sync messages to end of sending queue
|
||||
|
||||
let update_needed = get_busy_webxdc_instances(&context.sql).await?;
|
||||
context.flush_status_updates().await?;
|
||||
let update_needed_after_sending = get_busy_webxdc_instances(&context.sql).await?;
|
||||
|
||||
for msg_id in update_needed.difference(&update_needed_after_sending) {
|
||||
context.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id })
|
||||
}
|
||||
|
||||
context.send_sync_msg().await?;
|
||||
false
|
||||
} else {
|
||||
|
||||
@@ -425,11 +425,13 @@ impl Context {
|
||||
/// Returns true if there are more status updates to send, but rate limiter does not
|
||||
/// allow to send them. Returns false if there are no more status updates to send.
|
||||
pub(crate) async fn flush_status_updates(&self) -> Result<bool> {
|
||||
let update_needed = get_busy_webxdc_instances(&self.sql).await?;
|
||||
|
||||
loop {
|
||||
let (instance_id, first_serial, last_serial, descr) =
|
||||
match self.pop_smtp_status_update().await? {
|
||||
Some(res) => res,
|
||||
None => return Ok(false),
|
||||
None => break,
|
||||
};
|
||||
|
||||
if let Some(json) = self
|
||||
@@ -453,6 +455,11 @@ impl Context {
|
||||
chat::send_msg(self, instance.chat_id, &mut status_update).await?;
|
||||
}
|
||||
}
|
||||
let update_needed_after_sending = get_busy_webxdc_instances(&self.sql).await?;
|
||||
for msg_id in update_needed.difference(&update_needed_after_sending) {
|
||||
self.emit_event(EventType::WebxdcUpToDate { msg_id: *msg_id })
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub(crate) fn build_status_update_part(&self, json: &str) -> PartBuilder {
|
||||
@@ -2408,4 +2415,31 @@ sth_for_the = "future""#
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_webxdc_update_events() -> Result<()> {
|
||||
let alice = TestContext::new_alice().await;
|
||||
let chat_id = create_group_chat(&alice, ProtectionStatus::Unprotected, "foo").await?;
|
||||
let instance = send_webxdc_instance(&alice, chat_id).await?;
|
||||
alice
|
||||
.send_webxdc_status_update(
|
||||
instance.id,
|
||||
r#"{"payload":7,"info": "i","summary":"s"}"#,
|
||||
"",
|
||||
)
|
||||
.await?;
|
||||
alice
|
||||
.evtracker
|
||||
.get_matching(|evt| matches!(evt, EventType::WebxdcBusyUpdating { .. }))
|
||||
.await;
|
||||
|
||||
alice.flush_status_updates().await?;
|
||||
|
||||
println!("received event");
|
||||
alice
|
||||
.evtracker
|
||||
.get_matching(|evt| matches!(evt, EventType::WebxdcUpToDate { .. }))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user