mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
18 Commits
8518d3fc3a
...
webxdc_upd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
122e66746a | ||
|
|
afd7f90791 | ||
|
|
36cb8f3baf | ||
|
|
10390ae3c6 | ||
|
|
eab7f8ea15 | ||
|
|
32cbde0754 | ||
|
|
7aad25ea50 | ||
|
|
119f3ec9f2 | ||
|
|
fd486ec36c | ||
|
|
e9f77ff753 | ||
|
|
81415ce20e | ||
|
|
35f22d6c23 | ||
|
|
662f0b9a1c | ||
|
|
44508eb392 | ||
|
|
f72f922054 | ||
|
|
a895456dac | ||
|
|
de84a19135 | ||
|
|
8d62e5defb |
@@ -8,7 +8,7 @@
|
||||
### API-Changes
|
||||
- jsonrpc: add verified-by information to `Contact`-Object
|
||||
- Remove `attach_selfavatar` config #3951
|
||||
|
||||
- `DC_EVENT_WEBXDC_UPDATE_STATE_CHANGED` is emitted when webxdc update state changes #3320
|
||||
|
||||
## 1.106.0
|
||||
|
||||
|
||||
@@ -1082,6 +1082,17 @@ int dc_send_webxdc_status_update (dc_context_t* context, uint32_t msg_id, const
|
||||
*/
|
||||
char* dc_get_webxdc_status_updates (dc_context_t* context, uint32_t msg_id, uint32_t serial);
|
||||
|
||||
|
||||
/**
|
||||
* Checks if a webxdc has pending updates to be sent out.
|
||||
|
||||
* @param context The context object.
|
||||
* @param msg_id The ID of the message with the webxdc instance.
|
||||
* @return 1 = webxdc is updating , 0 = webxdc is not updating
|
||||
*/
|
||||
int dc_is_webxdc_updating (dc_context_t* context, uint32_t msg_id);
|
||||
|
||||
|
||||
/**
|
||||
* Save a draft for a chat in the database.
|
||||
*
|
||||
@@ -5872,10 +5883,17 @@ void dc_event_unref(dc_event_t* event);
|
||||
*
|
||||
* @param data1 (int) msg_id
|
||||
*/
|
||||
|
||||
#define DC_EVENT_WEBXDC_INSTANCE_DELETED 2121
|
||||
|
||||
|
||||
/**
|
||||
* Webxdc changed it's update sending state
|
||||
*
|
||||
* @param data1 (int) msg_id
|
||||
* @param data1 (int) is_sending
|
||||
*/
|
||||
#define DC_EVENT_WEBXDC_UPDATE_STATE_CHANGED 2122
|
||||
|
||||
/**
|
||||
* @}
|
||||
*/
|
||||
|
||||
@@ -521,6 +521,7 @@ pub unsafe extern "C" fn dc_event_get_id(event: *mut dc_event_t) -> libc::c_int
|
||||
EventType::SelfavatarChanged => 2110,
|
||||
EventType::WebxdcStatusUpdate { .. } => 2120,
|
||||
EventType::WebxdcInstanceDeleted { .. } => 2121,
|
||||
EventType::WebxdcUpdateStateChanged { .. } => 2122,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -570,6 +571,7 @@ pub unsafe extern "C" fn dc_event_get_data1_int(event: *mut dc_event_t) -> libc:
|
||||
}
|
||||
EventType::WebxdcStatusUpdate { msg_id, .. } => msg_id.to_u32() as libc::c_int,
|
||||
EventType::WebxdcInstanceDeleted { msg_id, .. } => msg_id.to_u32() as libc::c_int,
|
||||
EventType::WebxdcUpdateStateChanged { msg_id, .. } => msg_id.to_u32() as libc::c_int,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,6 +620,10 @@ pub unsafe extern "C" fn dc_event_get_data2_int(event: *mut dc_event_t) -> libc:
|
||||
status_update_serial,
|
||||
..
|
||||
} => status_update_serial.to_u32() as libc::c_int,
|
||||
EventType::WebxdcUpdateStateChanged {
|
||||
has_pending_updates: is_send,
|
||||
..
|
||||
} => *is_send as libc::c_int,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -662,6 +668,7 @@ pub unsafe extern "C" fn dc_event_get_data2_str(event: *mut dc_event_t) -> *mut
|
||||
| EventType::SelfavatarChanged
|
||||
| EventType::WebxdcStatusUpdate { .. }
|
||||
| EventType::WebxdcInstanceDeleted { .. }
|
||||
| EventType::WebxdcUpdateStateChanged { .. }
|
||||
| EventType::ChatEphemeralTimerModified { .. } => ptr::null_mut(),
|
||||
EventType::ConfigureProgress { comment, .. } => {
|
||||
if let Some(comment) = comment {
|
||||
@@ -3285,6 +3292,24 @@ pub unsafe extern "C" fn dc_msg_get_webxdc_info(msg: *mut dc_msg_t) -> *mut libc
|
||||
})
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_is_webxdc_updating(
|
||||
context: *mut dc_context_t,
|
||||
msg_id: u32,
|
||||
) -> libc::c_int {
|
||||
if context.is_null() {
|
||||
eprintln!("ignoring careless call to dc_is_webxdc_updating()");
|
||||
return 0;
|
||||
}
|
||||
let ctx = &*context;
|
||||
block_on(async move {
|
||||
webxdc::get_busy_webxdc_instances(ctx)
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.contains(&MsgId::new(msg_id)) as libc::c_int
|
||||
})
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn dc_msg_get_filemime(msg: *mut dc_msg_t) -> *mut libc::c_char {
|
||||
if msg.is_null() {
|
||||
|
||||
@@ -281,6 +281,7 @@ pub enum JSONRPCEventType {
|
||||
WebxdcInstanceDeleted {
|
||||
msg_id: u32,
|
||||
},
|
||||
WebxdcUpdateStateChanged,
|
||||
}
|
||||
|
||||
impl From<EventType> for JSONRPCEventType {
|
||||
@@ -381,6 +382,7 @@ impl From<EventType> for JSONRPCEventType {
|
||||
EventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
|
||||
msg_id: msg_id.to_u32(),
|
||||
},
|
||||
EventType::WebxdcUpdateStateChanged { .. } => WebxdcUpdateStateChanged,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
3
deltachat-jsonrpc/typescript/generated/events.ts
Normal file
3
deltachat-jsonrpc/typescript/generated/events.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
// AUTO-GENERATED by typescript-type-def
|
||||
|
||||
export type EventTypeName=("Info"|"SmtpConnected"|"ImapConnected"|"SmtpMessageSent"|"ImapMessageDeleted"|"ImapMessageMoved"|"NewBlobFile"|"DeletedBlobFile"|"Warning"|"Error"|"ErrorSelfNotInGroup"|"MsgsChanged"|"IncomingMsg"|"MsgsNoticed"|"MsgDelivered"|"MsgFailed"|"MsgRead"|"ChatModified"|"ChatEphemeralTimerModified"|"ContactsChanged"|"LocationChanged"|"ConfigureProgress"|"ImexProgress"|"ImexFileWritten"|"SecurejoinInviterProgress"|"SecurejoinJoinerProgress"|"ConnectivityChanged"|"SelfavatarChanged"|"WebxdcStatusUpdate"|"WebxdcInstanceDeleted"|"WebxdcUpdateStateChanged");
|
||||
@@ -316,8 +316,14 @@ pub enum EventType {
|
||||
status_update_serial: StatusUpdateSerial,
|
||||
},
|
||||
|
||||
/// Inform that a message containing a webxdc instance has been deleted
|
||||
/// Informs that a message containing a webxdc instance has been deleted
|
||||
WebxdcInstanceDeleted {
|
||||
msg_id: MsgId,
|
||||
},
|
||||
|
||||
/// Informs that the webxdc changed its update sending state
|
||||
WebxdcUpdateStateChanged {
|
||||
msg_id: MsgId,
|
||||
has_pending_updates: bool,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -388,7 +388,7 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
|
||||
if !duration_until_can_send.is_zero() {
|
||||
info!(
|
||||
ctx,
|
||||
"smtp got rate limited, waiting for {} until can send again",
|
||||
"smtp got rate limited, delaying next try by {}",
|
||||
duration_to_str(duration_until_can_send)
|
||||
);
|
||||
tokio::time::timeout(duration_until_can_send, async {
|
||||
|
||||
20
src/sql.rs
20
src/sql.rs
@@ -9,6 +9,7 @@ use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use rusqlite::types::FromSql;
|
||||
use rusqlite::{config::DbConfig, Connection, OpenFlags};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
@@ -365,6 +366,25 @@ impl Sql {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns unique values of a `column` in `table`
|
||||
pub async fn distinct<T: FromSql + Default>(
|
||||
&self,
|
||||
table: &str,
|
||||
column: &str,
|
||||
) -> Result<Vec<T>> {
|
||||
let conn = self.get_conn().await?;
|
||||
let rows: Result<Vec<T>> = tokio::task::block_in_place(move || {
|
||||
let mut stmt = conn.prepare(&format!("SELECT DISTINCT {} FROM {}", column, table))?;
|
||||
let rows = stmt
|
||||
.query([])?
|
||||
.mapped(|r| r.get(0))
|
||||
.map(|a| a.unwrap_or_default())
|
||||
.collect();
|
||||
Ok(rows)
|
||||
});
|
||||
rows
|
||||
}
|
||||
|
||||
/// Prepares and executes the statement and maps a function over the resulting rows.
|
||||
/// Then executes the second function over the returned iterator and returns the
|
||||
/// result of that function.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! # Handle webxdc messages.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::convert::TryFrom;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -411,13 +412,18 @@ impl Context {
|
||||
DO UPDATE SET last_serial=excluded.last_serial, descr=excluded.descr",
|
||||
paramsv![instance.id, status_update_serial, status_update_serial, descr],
|
||||
).await?;
|
||||
|
||||
self.emit_event(EventType::WebxdcUpdateStateChanged {
|
||||
msg_id: instance.id,
|
||||
has_pending_updates: true,
|
||||
});
|
||||
|
||||
self.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Pops one record of queued webxdc status updates.
|
||||
/// This function exists to make the sqlite statement testable.
|
||||
async fn pop_smtp_status_update(
|
||||
&self,
|
||||
) -> Result<Option<(MsgId, StatusUpdateSerial, StatusUpdateSerial, String)>> {
|
||||
@@ -442,13 +448,11 @@ impl Context {
|
||||
|
||||
/// Attempts to send queued webxdc status updates.
|
||||
pub(crate) async fn flush_status_updates(&self) -> Result<()> {
|
||||
loop {
|
||||
let (instance_id, first_serial, last_serial, descr) =
|
||||
match self.pop_smtp_status_update().await? {
|
||||
Some(res) => res,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let update_needed = get_busy_webxdc_instances(self).await?;
|
||||
|
||||
while let Some((instance_id, first_serial, last_serial, descr)) =
|
||||
self.pop_smtp_status_update().await?
|
||||
{
|
||||
if let Some(json) = self
|
||||
.render_webxdc_status_update_object(instance_id, Some((first_serial, last_serial)))
|
||||
.await?
|
||||
@@ -470,6 +474,14 @@ impl Context {
|
||||
chat::send_msg(self, instance.chat_id, &mut status_update).await?;
|
||||
}
|
||||
}
|
||||
let update_needed_after_sending = get_busy_webxdc_instances(self).await?;
|
||||
for msg_id in update_needed.difference(&update_needed_after_sending) {
|
||||
self.emit_event(EventType::WebxdcUpdateStateChanged {
|
||||
msg_id: *msg_id,
|
||||
has_pending_updates: false,
|
||||
})
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn build_status_update_part(&self, json: &str) -> PartBuilder {
|
||||
@@ -768,6 +780,16 @@ impl Message {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a hashset of all webxdc instaces which still have updates to send
|
||||
pub async fn get_busy_webxdc_instances(ctx: &Context) -> Result<HashSet<MsgId>> {
|
||||
Ok(ctx
|
||||
.sql
|
||||
.distinct("smtp_status_updates", "msg_id")
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::chat::{
|
||||
@@ -2416,4 +2438,46 @@ sth_for_the = "future""#
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_webxdc_update_events() -> Result<()> {
|
||||
let alice = TestContext::new_alice().await;
|
||||
let chat_id = create_group_chat(&alice, ProtectionStatus::Unprotected, "foo").await?;
|
||||
let instance = send_webxdc_instance(&alice, chat_id).await?;
|
||||
alice
|
||||
.send_webxdc_status_update(
|
||||
instance.id,
|
||||
r#"{"payload":7,"info": "i","summary":"s"}"#,
|
||||
"",
|
||||
)
|
||||
.await?;
|
||||
alice
|
||||
.evtracker
|
||||
.get_matching(|evt| {
|
||||
matches!(
|
||||
evt,
|
||||
EventType::WebxdcUpdateStateChanged {
|
||||
has_pending_updates: true,
|
||||
..
|
||||
}
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
alice.flush_status_updates().await?;
|
||||
|
||||
alice
|
||||
.evtracker
|
||||
.get_matching(|evt| {
|
||||
matches!(
|
||||
evt,
|
||||
EventType::WebxdcUpdateStateChanged {
|
||||
has_pending_updates: false,
|
||||
..
|
||||
}
|
||||
)
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user