mirror of
https://github.com/chatmail/core.git
synced 2026-05-19 23:06:32 +03:00
fix: Keep webxdc instance for delete_device_after period after a status update (#5365)
If `delete_device_after` is configured, that period should be counted for webxdc instances from the last status update, otherwise nothing prevents from deleting them. Use `msgs.timestamp_rcvd` to store the last status update timestamp, it anyway isn't used for anything except displaying a detailed message info. Also, as `ephemeral::select_expired_messages()` now also checks `timestamp_rcvd`, we have an improvement that a message is guaranteed not to be deleted for the `delete_device_after` period since its receipt. Before only the sort timestamp was checked which is derived from the "sent" timestamp.
This commit is contained in:
@@ -64,6 +64,7 @@ pub async fn debug_logging_loop(context: &Context, events: Receiver<DebugEventLo
|
|||||||
document: None,
|
document: None,
|
||||||
uid: None,
|
uid: None,
|
||||||
},
|
},
|
||||||
|
time,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -392,7 +392,8 @@ WHERE
|
|||||||
SELECT id, chat_id, type
|
SELECT id, chat_id, type
|
||||||
FROM msgs
|
FROM msgs
|
||||||
WHERE
|
WHERE
|
||||||
timestamp < ?
|
timestamp < ?1
|
||||||
|
AND timestamp_rcvd < ?1
|
||||||
AND chat_id > ?
|
AND chat_id > ?
|
||||||
AND chat_id != ?
|
AND chat_id != ?
|
||||||
AND chat_id != ?
|
AND chat_id != ?
|
||||||
@@ -490,7 +491,7 @@ async fn next_delete_device_after_timestamp(context: &Context) -> Result<Option<
|
|||||||
.sql
|
.sql
|
||||||
.query_get_value(
|
.query_get_value(
|
||||||
r#"
|
r#"
|
||||||
SELECT min(timestamp)
|
SELECT min(max(timestamp, timestamp_rcvd))
|
||||||
FROM msgs
|
FROM msgs
|
||||||
WHERE chat_id > ?
|
WHERE chat_id > ?
|
||||||
AND chat_id != ?
|
AND chat_id != ?
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
|||||||
|
|
||||||
use deltachat_derive::FromSql;
|
use deltachat_derive::FromSql;
|
||||||
use lettre_email::PartBuilder;
|
use lettre_email::PartBuilder;
|
||||||
|
use rusqlite::OptionalExtension;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
@@ -291,7 +292,7 @@ impl Context {
|
|||||||
from_id: ContactId,
|
from_id: ContactId,
|
||||||
) -> Result<Option<StatusUpdateSerial>> {
|
) -> Result<Option<StatusUpdateSerial>> {
|
||||||
let Some(status_update_serial) = self
|
let Some(status_update_serial) = self
|
||||||
.write_status_update_inner(&instance.id, &status_update_item)
|
.write_status_update_inner(&instance.id, &status_update_item, timestamp)
|
||||||
.await?
|
.await?
|
||||||
else {
|
else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@@ -372,27 +373,30 @@ impl Context {
|
|||||||
&self,
|
&self,
|
||||||
instance_id: &MsgId,
|
instance_id: &MsgId,
|
||||||
status_update_item: &StatusUpdateItem,
|
status_update_item: &StatusUpdateItem,
|
||||||
|
timestamp: i64,
|
||||||
) -> Result<Option<StatusUpdateSerial>> {
|
) -> Result<Option<StatusUpdateSerial>> {
|
||||||
let _lock = self.sql.write_lock().await;
|
|
||||||
let uid = status_update_item.uid.as_deref();
|
let uid = status_update_item.uid.as_deref();
|
||||||
let Some(rowid) = self
|
let status_update_item = serde_json::to_string(&status_update_item)?;
|
||||||
.sql
|
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||||
.query_row_optional(
|
t.execute(
|
||||||
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
|
"UPDATE msgs SET timestamp_rcvd=? WHERE id=?",
|
||||||
ON CONFLICT (uid) DO NOTHING
|
(timestamp, instance_id),
|
||||||
RETURNING id",
|
)?;
|
||||||
(
|
let rowid = t
|
||||||
instance_id,
|
.query_row(
|
||||||
serde_json::to_string(&status_update_item)?,
|
"INSERT INTO msgs_status_updates (msg_id, update_item, uid) VALUES(?, ?, ?)
|
||||||
uid,
|
ON CONFLICT (uid) DO NOTHING
|
||||||
),
|
RETURNING id",
|
||||||
|row| {
|
(instance_id, status_update_item, uid),
|
||||||
let id: u32 = row.get(0)?;
|
|row| {
|
||||||
Ok(id)
|
let id: u32 = row.get(0)?;
|
||||||
},
|
Ok(id)
|
||||||
)
|
},
|
||||||
.await?
|
)
|
||||||
else {
|
.optional()?;
|
||||||
|
Ok(rowid)
|
||||||
|
};
|
||||||
|
let Some(rowid) = self.sql.transaction(trans_fn).await? else {
|
||||||
let uid = uid.unwrap_or("-");
|
let uid = uid.unwrap_or("-");
|
||||||
info!(self, "Ignoring duplicate status update with uid={uid}");
|
info!(self, "Ignoring duplicate status update with uid={uid}");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@@ -850,6 +854,8 @@ impl Message {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -860,8 +866,10 @@ mod tests {
|
|||||||
use crate::chatlist::Chatlist;
|
use crate::chatlist::Chatlist;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::contact::Contact;
|
use crate::contact::Contact;
|
||||||
|
use crate::ephemeral;
|
||||||
use crate::receive_imf::{receive_imf, receive_imf_from_inbox};
|
use crate::receive_imf::{receive_imf, receive_imf_from_inbox};
|
||||||
use crate::test_utils::TestContext;
|
use crate::test_utils::TestContext;
|
||||||
|
use crate::tools::{self, SystemTime};
|
||||||
use crate::{message, sql};
|
use crate::{message, sql};
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
@@ -2650,4 +2658,43 @@ sth_for_the = "future""#
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_status_update_vs_delete_device_after() -> Result<()> {
|
||||||
|
let alice = &TestContext::new_alice().await;
|
||||||
|
let bob = &TestContext::new_bob().await;
|
||||||
|
bob.set_config(Config::DeleteDeviceAfter, Some("3600"))
|
||||||
|
.await?;
|
||||||
|
let alice_chat = alice.create_chat(bob).await;
|
||||||
|
let alice_instance = send_webxdc_instance(alice, alice_chat.id).await?;
|
||||||
|
let bob_instance = bob.recv_msg(&alice.pop_sent_msg().await).await;
|
||||||
|
|
||||||
|
SystemTime::shift(Duration::from_secs(1800));
|
||||||
|
let mut update = Message {
|
||||||
|
chat_id: alice_chat.id,
|
||||||
|
viewtype: Viewtype::Text,
|
||||||
|
text: "I'm an update".to_string(),
|
||||||
|
hidden: true,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
update.param.set_cmd(SystemMessage::WebxdcStatusUpdate);
|
||||||
|
update
|
||||||
|
.param
|
||||||
|
.set(Param::Arg, r#"{"updates":[{"payload":{"foo":"bar"}}]}"#);
|
||||||
|
update.set_quote(alice, Some(&alice_instance)).await?;
|
||||||
|
let sent_msg = alice.send_msg(alice_chat.id, &mut update).await;
|
||||||
|
bob.recv_msg(&sent_msg).await;
|
||||||
|
assert_eq!(
|
||||||
|
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
|
||||||
|
.await?,
|
||||||
|
r#"[{"payload":{"foo":"bar"},"serial":1,"max_serial":1}]"#
|
||||||
|
);
|
||||||
|
|
||||||
|
SystemTime::shift(Duration::from_secs(2700));
|
||||||
|
ephemeral::delete_expired_messages(bob, tools::time()).await?;
|
||||||
|
let bob_instance = Message::load_from_db(bob, bob_instance.id).await?;
|
||||||
|
assert_eq!(bob_instance.chat_id.is_trash(), false);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user