From 91b345abfeab17e9daf530f9ca10e95c7aa6e4b4 Mon Sep 17 00:00:00 2001 From: bjoern Date: Sat, 9 Jul 2022 18:26:12 +0200 Subject: [PATCH] handle webxdc updates for not downloaded instances (#3487) * save webxdc-updates for not yet downloaded messages, that are probably webxdc instances then * test webxdc updates received while instance is not yet downloaded * keep msg_id on downloading messages keeping msg_id on downloading messages has the advantage that webxdc updates and other references to the msg_id can be processed as usual. if a message expands to multiple msg_id, the last one is kept, however, this does not affect webxdc at all. (alternatives may be to update `msgs_status_updates` but that seems more complicated and even less elegant, another alternative would be to use different keys (eg. `rfc274_mid`), but that also seems not to be much easier and would waste space as well. also both alternatives would need adaption for other foreign keys) * update CHANGELOG * do not emit WebxdcStatusUpdate event in case the message is not yet downloaded * move DELETE/UPDATE to an transaction * make merge_msg_id() a little less confusing * use some webxdc-update-param from placeholder (the placeholder may be updated, the just downloaded messages is not) * more precise function name * test not directly downloading status updates * test not directly downloading mdn --- CHANGELOG.md | 1 + src/download.rs | 155 ++++++++++++++++++++++++++++++++++++++++++++- src/receive_imf.rs | 22 +++++-- src/webxdc.rs | 80 +++++++++++++++++++++-- 4 files changed, 246 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 601ac1850..34909fcb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Fixes - replace musl libc name resolution errors with a better message #3485 +- handle updates for not yet downloaded webxdc instances #3487 ## 1.88.0 diff --git a/src/download.rs b/src/download.rs index ea2b6fefc..e64c26d4e 100644 --- a/src/download.rs +++ b/src/download.rs @@ -11,7 +11,7 @@ use crate::imap::{Imap, ImapActionResult}; use crate::job::{self, Action, Job, Status}; use crate::message::{Message, MsgId, Viewtype}; use crate::mimeparser::{MimeMessage, Part}; -use crate::param::Params; +use crate::param::{Param, Params}; use crate::tools::time; use crate::{job_try, stock_str, EventType}; use std::cmp::max; @@ -69,6 +69,42 @@ impl Context { Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32))) } } + + // Merges the two messages to `placeholder_msg_id`; + // `full_msg_id` is no longer used afterwards. + pub(crate) async fn merge_messages( + &self, + full_msg_id: MsgId, + placeholder_msg_id: MsgId, + ) -> Result<()> { + let placeholder = Message::load_from_db(self, placeholder_msg_id).await?; + self.sql + .transaction(move |transaction| { + transaction + .execute("DELETE FROM msgs WHERE id=?;", paramsv![placeholder_msg_id])?; + transaction.execute( + "UPDATE msgs SET id=? WHERE id=?", + paramsv![placeholder_msg_id, full_msg_id], + )?; + Ok(()) + }) + .await?; + let mut full = Message::load_from_db(self, placeholder_msg_id).await?; + + for key in [ + Param::WebxdcSummary, + Param::WebxdcSummaryTimestamp, + Param::WebxdcDocument, + Param::WebxdcDocumentTimestamp, + ] { + if let Some(value) = placeholder.param.get(key) { + full.param.set(key, value); + } + } + full.update_param(self).await?; + + Ok(()) + } } impl MsgId { @@ -256,7 +292,7 @@ impl MimeMessage { mod tests { use num_traits::FromPrimitive; - use crate::chat::send_msg; + use crate::chat::{get_chat_msgs, send_msg}; use crate::ephemeral::Timer; use crate::message::Viewtype; use crate::receive_imf::receive_imf_inner; @@ -410,4 +446,119 @@ mod tests { Ok(()) } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_status_update_expands_to_nothing() -> Result<()> { + let alice = TestContext::new_alice().await; + let bob = TestContext::new_bob().await; + let chat_id = alice.create_chat(&bob).await.id; + + let file = alice.get_blobdir().join("minimal.xdc"); + tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?; + let mut instance = Message::new(Viewtype::File); + instance.set_file(file.to_str().unwrap(), None); + let _sent1 = alice.send_msg(chat_id, &mut instance).await; + + alice + .send_webxdc_status_update(instance.id, r#"{"payload":7}"#, "d") + .await?; + alice.flush_status_updates().await?; + let sent2 = alice.pop_sent_msg().await; + let sent2_rfc742_mid = Message::load_from_db(&alice, sent2.sender_msg_id) + .await? + .rfc724_mid; + + // not downloading the status update results in an placeholder + receive_imf_inner( + &bob, + &sent2_rfc742_mid, + sent2.payload().as_bytes(), + false, + Some(sent2.payload().len() as u32), + false, + ) + .await?; + let msg = bob.get_last_msg().await; + let chat_id = msg.chat_id; + assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 1); + assert_eq!(msg.download_state(), DownloadState::Available); + + // downloading the status update afterwards expands to nothing and moves the placeholder to trash-chat + // (usually status updates are too small for not being downloaded directly) + receive_imf_inner( + &bob, + &sent2_rfc742_mid, + sent2.payload().as_bytes(), + false, + None, + false, + ) + .await?; + assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 0); + assert!(Message::load_from_db(&bob, msg.id) + .await? + .chat_id + .is_trash()); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_mdn_expands_to_nothing() -> Result<()> { + let bob = TestContext::new_bob().await; + let raw = b"Subject: Message opened\n\ + Date: Mon, 10 Jan 2020 00:00:00 +0000\n\ + Chat-Version: 1.0\n\ + Message-ID: \n\ + To: Alice \n\ + From: Bob \n\ + Content-Type: multipart/report; report-type=disposition-notification;\n\t\ + boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\ + \n\ + \n\ + --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\ + Content-Type: text/plain; charset=utf-8\n\ + \n\ + bla\n\ + \n\ + \n\ + --kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\ + Content-Type: message/disposition-notification\n\ + \n\ + Reporting-UA: Delta Chat 1.88.0\n\ + Original-Recipient: rfc822;bob@example.org\n\ + Final-Recipient: rfc822;bob@example.org\n\ + Original-Message-ID: \n\ + Disposition: manual-action/MDN-sent-automatically; displayed\n\ + \n\ + \n\ + --kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\ + "; + + // not downloading the mdn results in an placeholder + receive_imf_inner( + &bob, + "bar@example.org", + raw, + false, + Some(raw.len() as u32), + false, + ) + .await?; + let msg = bob.get_last_msg().await; + let chat_id = msg.chat_id; + assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 1); + assert_eq!(msg.download_state(), DownloadState::Available); + + // downloading the mdn afterwards expands to nothing and deletes the placeholder directly + // (usually mdn are too small for not being downloaded directly) + receive_imf_inner(&bob, "bar@example.org", raw, false, None, false).await?; + assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 0); + assert!(Message::load_from_db(&bob, msg.id) + .await? + .chat_id + .is_trash()); + + Ok(()) + } } diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 563e7b9d7..4c4691e8f 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -130,15 +130,14 @@ pub(crate) async fn receive_imf_inner( context, "Message already partly in DB, replacing by full message." ); - old_msg_id.delete_from_db(context).await?; - true + Some(old_msg_id) } else { // the message was probably moved around. info!(context, "Message already in DB, doing nothing."); return Ok(None); } } else { - false + None }; // the function returns the number of created messages in the database @@ -189,8 +188,9 @@ pub(crate) async fn receive_imf_inner( sent_timestamp, rcvd_timestamp, from_id, - seen || replace_partial_download, + seen || replace_partial_download.is_some(), is_partial_download, + replace_partial_download, fetching_existing_messages, prevent_rename, ) @@ -322,7 +322,7 @@ pub(crate) async fn receive_imf_inner( } } - if replace_partial_download { + if replace_partial_download.is_some() { context.emit_msgs_changed(chat_id, MsgId::new(0)); } else if !chat_id.is_trash() { let fresh = received_msg.state == MessageState::InFresh; @@ -401,6 +401,7 @@ async fn add_parts( from_id: ContactId, seen: bool, is_partial_download: Option, + replace_msg_id: Option, fetching_existing_messages: bool, prevent_rename: bool, ) -> Result { @@ -1145,6 +1146,17 @@ INSERT INTO msgs } drop(conn); + if let Some(replace_msg_id) = replace_msg_id { + if let Some(created_msg_id) = created_db_entries.pop() { + context + .merge_messages(created_msg_id, replace_msg_id) + .await?; + created_db_entries.push(replace_msg_id); + } else { + replace_msg_id.delete_from_db(context).await?; + } + } + chat_id.unarchive_if_not_muted(context).await?; info!( diff --git a/src/webxdc.rs b/src/webxdc.rs index ce63549a9..178438856 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -14,6 +14,7 @@ use tokio::io::AsyncReadExt; use crate::chat::Chat; use crate::contact::ContactId; use crate::context::Context; +use crate::download::DownloadState; use crate::message::{Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::SystemMessage; use crate::param::Param; @@ -330,10 +331,12 @@ impl Context { let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?); - self.emit_event(EventType::WebxdcStatusUpdate { - msg_id: instance.id, - status_update_serial, - }); + if instance.viewtype == Viewtype::Webxdc { + self.emit_event(EventType::WebxdcStatusUpdate { + msg_id: instance.id, + status_update_serial, + }); + } Ok(status_update_serial) } @@ -475,6 +478,8 @@ impl Context { } else if let Some(parent) = msg.parent(self).await? { if parent.viewtype == Viewtype::Webxdc { (msg.timestamp_sort, parent, true) + } else if parent.download_state() != DownloadState::Done { + (msg.timestamp_sort, parent, false) } else { bail!("receive_status_update: message is not the child of a webxdc message.") } @@ -730,7 +735,7 @@ mod tests { use crate::chatlist::Chatlist; use crate::config::Config; use crate::contact::Contact; - use crate::receive_imf::receive_imf; + use crate::receive_imf::{receive_imf, receive_imf_inner}; use crate::test_utils::TestContext; use super::*; @@ -1038,6 +1043,71 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_webxdc_update_for_not_downloaded_instance() -> Result<()> { + // Alice sends a larger instance and an update + let alice = TestContext::new_alice().await; + let bob = TestContext::new_bob().await; + let chat = alice.create_chat(&bob).await; + bob.set_config(Config::DownloadLimit, Some("40000")).await?; + let mut alice_instance = create_webxdc_instance( + &alice, + "chess.xdc", + include_bytes!("../test-data/webxdc/chess.xdc"), + ) + .await?; + let sent1 = alice.send_msg(chat.id, &mut alice_instance).await; + let alice_instance = Message::load_from_db(&alice, sent1.sender_msg_id).await?; + alice + .send_webxdc_status_update( + alice_instance.id, + r#"{"payload": 7, "summary":"sum", "document":"doc"}"#, + "bla", + ) + .await?; + alice.flush_status_updates().await?; + let sent2 = alice.pop_sent_msg().await; + + // Bob does not download instance but already receives update + receive_imf_inner( + &bob, + &alice_instance.rfc724_mid, + sent1.payload().as_bytes(), + false, + Some(70790), + false, + ) + .await?; + let bob_instance = bob.get_last_msg().await; + bob_instance.chat_id.accept(&bob).await?; + bob.recv_msg(&sent2).await; + assert_eq!(bob_instance.download_state, DownloadState::Available); + + // Bob downloads instance, updates should be assigned correctly + receive_imf_inner( + &bob, + &alice_instance.rfc724_mid, + sent1.payload().as_bytes(), + false, + None, + false, + ) + .await?; + let bob_instance = bob.get_last_msg().await; + assert_eq!(bob_instance.viewtype, Viewtype::Webxdc); + assert_eq!(bob_instance.download_state, DownloadState::Done); + assert_eq!( + bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0)) + .await?, + r#"[{"payload":7,"document":"doc","summary":"sum","serial":1,"max_serial":1}]"# + ); + let info = bob_instance.get_webxdc_info(&bob).await?; + assert_eq!(info.document, "doc"); + assert_eq!(info.summary, "sum"); + + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_delete_webxdc_instance() -> Result<()> { let t = TestContext::new_alice().await;