mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 01:16:31 +03:00
Emit per-message events for expired messages
Instead of emitting single MsgsChanged event with zero chat and msg IDs, emit one event per message. Also emit WebxdcInstanceDeleted event if expired message contains a webxdc.
This commit is contained in:
@@ -6,6 +6,8 @@
|
|||||||
|
|
||||||
### Changes
|
### Changes
|
||||||
- truncate incoming messages by lines instead of just length #3480
|
- truncate incoming messages by lines instead of just length #3480
|
||||||
|
- emit separate `DC_EVENT_MSGS_CHANGED` for each expired message,
|
||||||
|
and `DC_EVENT_WEBXDC_INSTANCE_DELETED` when a message contains a webxdc #3605
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
|
|
||||||
|
|||||||
134
src/ephemeral.rs
134
src/ephemeral.rs
@@ -332,35 +332,35 @@ pub(crate) async fn start_ephemeral_timers_msgids(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deletes messages which are expired according to
|
/// Selects messages which are expired according to
|
||||||
/// `delete_device_after` setting or `ephemeral_timestamp` column.
|
/// `delete_device_after` setting or `ephemeral_timestamp` column.
|
||||||
///
|
///
|
||||||
/// Returns true if any message is deleted, so caller can emit
|
/// For each message a row ID, chat id and viewtype is returned.
|
||||||
/// MsgsChanged event. If nothing has been deleted, returns
|
async fn select_expired_messages(
|
||||||
/// false. This function does not emit the MsgsChanged event itself,
|
context: &Context,
|
||||||
/// because it is also called when chatlist is reloaded, and emitting
|
now: i64,
|
||||||
/// MsgsChanged there will cause infinite reload loop.
|
) -> Result<Vec<(MsgId, ChatId, Viewtype)>> {
|
||||||
pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> {
|
let mut rows = context
|
||||||
let mut updated = context
|
|
||||||
.sql
|
.sql
|
||||||
.execute(
|
.query_map(
|
||||||
// If you change which information is removed here, also change MsgId::trash() and
|
|
||||||
// which information receive_imf::add_parts() still adds to the db if the chat_id is TRASH
|
|
||||||
r#"
|
r#"
|
||||||
UPDATE msgs
|
SELECT id, chat_id, type
|
||||||
SET
|
FROM msgs
|
||||||
chat_id=?, txt='', subject='', txt_raw='',
|
|
||||||
mime_headers='', from_id=0, to_id=0, param=''
|
|
||||||
WHERE
|
WHERE
|
||||||
ephemeral_timestamp != 0
|
ephemeral_timestamp != 0
|
||||||
AND ephemeral_timestamp <= ?
|
AND ephemeral_timestamp <= ?
|
||||||
AND chat_id != ?
|
AND chat_id != ?
|
||||||
"#,
|
"#,
|
||||||
paramsv![DC_CHAT_ID_TRASH, now, DC_CHAT_ID_TRASH],
|
paramsv![now, DC_CHAT_ID_TRASH],
|
||||||
|
|row| {
|
||||||
|
let id: MsgId = row.get("id")?;
|
||||||
|
let chat_id: ChatId = row.get("chat_id")?;
|
||||||
|
let viewtype: Viewtype = row.get("type")?;
|
||||||
|
Ok((id, chat_id, viewtype))
|
||||||
|
},
|
||||||
|
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
.context("update failed")?
|
|
||||||
> 0;
|
|
||||||
|
|
||||||
if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
|
if let Some(delete_device_after) = context.get_config_delete_device_after().await? {
|
||||||
let self_chat_id = ChatId::lookup_by_contact(context, ContactId::SELF)
|
let self_chat_id = ChatId::lookup_by_contact(context, ContactId::SELF)
|
||||||
@@ -372,36 +372,81 @@ WHERE
|
|||||||
|
|
||||||
let threshold_timestamp = now.saturating_sub(delete_device_after);
|
let threshold_timestamp = now.saturating_sub(delete_device_after);
|
||||||
|
|
||||||
// Delete expired messages
|
let rows_expired = context
|
||||||
//
|
|
||||||
// Only update the rows that have to be updated, to avoid emitting
|
|
||||||
// unnecessary "chat modified" events.
|
|
||||||
let rows_modified = context
|
|
||||||
.sql
|
.sql
|
||||||
.execute(
|
.query_map(
|
||||||
"UPDATE msgs \
|
r#"
|
||||||
SET chat_id = ?, txt = '', subject='', txt_raw='', \
|
SELECT id, chat_id, type
|
||||||
mime_headers='', from_id=0, to_id=0, param='' \
|
FROM msgs
|
||||||
WHERE timestamp < ? \
|
WHERE
|
||||||
AND chat_id > ? \
|
timestamp < ?
|
||||||
AND chat_id != ? \
|
AND chat_id > ?
|
||||||
AND chat_id != ?",
|
AND chat_id != ?
|
||||||
|
AND chat_id != ?
|
||||||
|
"#,
|
||||||
paramsv![
|
paramsv![
|
||||||
DC_CHAT_ID_TRASH,
|
|
||||||
threshold_timestamp,
|
threshold_timestamp,
|
||||||
DC_CHAT_ID_LAST_SPECIAL,
|
DC_CHAT_ID_LAST_SPECIAL,
|
||||||
self_chat_id,
|
self_chat_id,
|
||||||
device_chat_id
|
device_chat_id
|
||||||
],
|
],
|
||||||
|
|row| {
|
||||||
|
let id: MsgId = row.get("id")?;
|
||||||
|
let chat_id: ChatId = row.get("chat_id")?;
|
||||||
|
let viewtype: Viewtype = row.get("type")?;
|
||||||
|
Ok((id, chat_id, viewtype))
|
||||||
|
},
|
||||||
|
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
.context("deleted update failed")?;
|
|
||||||
|
|
||||||
updated |= rows_modified > 0;
|
rows.extend(rows_expired);
|
||||||
}
|
}
|
||||||
|
|
||||||
if updated {
|
Ok(rows)
|
||||||
context.emit_msgs_changed_without_ids();
|
}
|
||||||
|
|
||||||
|
/// Deletes messages which are expired according to
|
||||||
|
/// `delete_device_after` setting or `ephemeral_timestamp` column.
|
||||||
|
///
|
||||||
|
/// Emits relevant `MsgsChanged` and `WebxdcInstanceDeleted` events
|
||||||
|
/// if messages are deleted.
|
||||||
|
pub(crate) async fn delete_expired_messages(context: &Context, now: i64) -> Result<()> {
|
||||||
|
let rows = select_expired_messages(context, now).await?;
|
||||||
|
|
||||||
|
if !rows.is_empty() {
|
||||||
|
context
|
||||||
|
.sql
|
||||||
|
.execute(
|
||||||
|
// If you change which information is removed here, also change MsgId::trash() and
|
||||||
|
// which information receive_imf::add_parts() still adds to the db if the chat_id is TRASH
|
||||||
|
&format!(
|
||||||
|
r#"
|
||||||
|
UPDATE msgs
|
||||||
|
SET
|
||||||
|
chat_id=?, txt='', subject='', txt_raw='',
|
||||||
|
mime_headers='', from_id=0, to_id=0, param=''
|
||||||
|
WHERE id IN ({})
|
||||||
|
"#,
|
||||||
|
sql::repeat_vars(rows.len())
|
||||||
|
),
|
||||||
|
rusqlite::params_from_iter(
|
||||||
|
std::iter::once(&DC_CHAT_ID_TRASH as &dyn crate::ToSql).chain(
|
||||||
|
rows.iter()
|
||||||
|
.map(|(msg_id, _chat_id, _viewtype)| msg_id as &dyn crate::ToSql),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.context("update failed")?;
|
||||||
|
|
||||||
|
for (msg_id, chat_id, viewtype) in rows {
|
||||||
|
context.emit_msgs_changed(chat_id, msg_id);
|
||||||
|
|
||||||
|
if viewtype == Viewtype::Webxdc {
|
||||||
|
context.emit_event(EventType::WebxdcInstanceDeleted { msg_id });
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -956,6 +1001,19 @@ mod tests {
|
|||||||
|
|
||||||
assert!(next_expiration < deleted_at);
|
assert!(next_expiration < deleted_at);
|
||||||
delete_expired_messages(t, deleted_at).await?;
|
delete_expired_messages(t, deleted_at).await?;
|
||||||
|
t.evtracker
|
||||||
|
.get_matching(|evt| {
|
||||||
|
if let EventType::MsgsChanged {
|
||||||
|
msg_id: event_msg_id,
|
||||||
|
..
|
||||||
|
} = evt
|
||||||
|
{
|
||||||
|
*event_msg_id == msg_id
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
let loaded = Message::load_from_db(t, msg_id).await?;
|
let loaded = Message::load_from_db(t, msg_id).await?;
|
||||||
assert_eq!(loaded.text.unwrap(), "");
|
assert_eq!(loaded.text.unwrap(), "");
|
||||||
|
|||||||
Reference in New Issue
Block a user