mirror of
https://github.com/chatmail/core.git
synced 2026-05-16 21:36:30 +03:00
handle webxdc updates for not downloaded instances (#3487)
* save webxdc-updates for not yet downloaded messages, that are probably webxdc instances then * test webxdc updates received while instance is not yet downloaded * keep msg_id on downloading messages keeping msg_id on downloading messages has the advantage that webxdc updates and other references to the msg_id can be processed as usual. if a message expands to multiple msg_id, the last one is kept, however, this does not affect webxdc at all. (alternatives may be to update `msgs_status_updates` but that seems more complicated and even less elegant, another alternative would be to use different keys (eg. `rfc274_mid`), but that also seems not to be much easier and would waste space as well. also both alternatives would need adaption for other foreign keys) * update CHANGELOG * do not emit WebxdcStatusUpdate event in case the message is not yet downloaded * move DELETE/UPDATE to an transaction * make merge_msg_id() a little less confusing * use some webxdc-update-param from placeholder (the placeholder may be updated, the just downloaded messages is not) * more precise function name * test not directly downloading status updates * test not directly downloading mdn
This commit is contained in:
@@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- replace musl libc name resolution errors with a better message #3485
|
- replace musl libc name resolution errors with a better message #3485
|
||||||
|
- handle updates for not yet downloaded webxdc instances #3487
|
||||||
|
|
||||||
|
|
||||||
## 1.88.0
|
## 1.88.0
|
||||||
|
|||||||
155
src/download.rs
155
src/download.rs
@@ -11,7 +11,7 @@ use crate::imap::{Imap, ImapActionResult};
|
|||||||
use crate::job::{self, Action, Job, Status};
|
use crate::job::{self, Action, Job, Status};
|
||||||
use crate::message::{Message, MsgId, Viewtype};
|
use crate::message::{Message, MsgId, Viewtype};
|
||||||
use crate::mimeparser::{MimeMessage, Part};
|
use crate::mimeparser::{MimeMessage, Part};
|
||||||
use crate::param::Params;
|
use crate::param::{Param, Params};
|
||||||
use crate::tools::time;
|
use crate::tools::time;
|
||||||
use crate::{job_try, stock_str, EventType};
|
use crate::{job_try, stock_str, EventType};
|
||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
@@ -69,6 +69,42 @@ impl Context {
|
|||||||
Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
|
Ok(Some(max(MIN_DOWNLOAD_LIMIT, download_limit as u32)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merges the two messages to `placeholder_msg_id`;
|
||||||
|
// `full_msg_id` is no longer used afterwards.
|
||||||
|
pub(crate) async fn merge_messages(
|
||||||
|
&self,
|
||||||
|
full_msg_id: MsgId,
|
||||||
|
placeholder_msg_id: MsgId,
|
||||||
|
) -> Result<()> {
|
||||||
|
let placeholder = Message::load_from_db(self, placeholder_msg_id).await?;
|
||||||
|
self.sql
|
||||||
|
.transaction(move |transaction| {
|
||||||
|
transaction
|
||||||
|
.execute("DELETE FROM msgs WHERE id=?;", paramsv![placeholder_msg_id])?;
|
||||||
|
transaction.execute(
|
||||||
|
"UPDATE msgs SET id=? WHERE id=?",
|
||||||
|
paramsv![placeholder_msg_id, full_msg_id],
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let mut full = Message::load_from_db(self, placeholder_msg_id).await?;
|
||||||
|
|
||||||
|
for key in [
|
||||||
|
Param::WebxdcSummary,
|
||||||
|
Param::WebxdcSummaryTimestamp,
|
||||||
|
Param::WebxdcDocument,
|
||||||
|
Param::WebxdcDocumentTimestamp,
|
||||||
|
] {
|
||||||
|
if let Some(value) = placeholder.param.get(key) {
|
||||||
|
full.param.set(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
full.update_param(self).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MsgId {
|
impl MsgId {
|
||||||
@@ -256,7 +292,7 @@ impl MimeMessage {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use num_traits::FromPrimitive;
|
use num_traits::FromPrimitive;
|
||||||
|
|
||||||
use crate::chat::send_msg;
|
use crate::chat::{get_chat_msgs, send_msg};
|
||||||
use crate::ephemeral::Timer;
|
use crate::ephemeral::Timer;
|
||||||
use crate::message::Viewtype;
|
use crate::message::Viewtype;
|
||||||
use crate::receive_imf::receive_imf_inner;
|
use crate::receive_imf::receive_imf_inner;
|
||||||
@@ -410,4 +446,119 @@ mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_status_update_expands_to_nothing() -> Result<()> {
|
||||||
|
let alice = TestContext::new_alice().await;
|
||||||
|
let bob = TestContext::new_bob().await;
|
||||||
|
let chat_id = alice.create_chat(&bob).await.id;
|
||||||
|
|
||||||
|
let file = alice.get_blobdir().join("minimal.xdc");
|
||||||
|
tokio::fs::write(&file, include_bytes!("../test-data/webxdc/minimal.xdc")).await?;
|
||||||
|
let mut instance = Message::new(Viewtype::File);
|
||||||
|
instance.set_file(file.to_str().unwrap(), None);
|
||||||
|
let _sent1 = alice.send_msg(chat_id, &mut instance).await;
|
||||||
|
|
||||||
|
alice
|
||||||
|
.send_webxdc_status_update(instance.id, r#"{"payload":7}"#, "d")
|
||||||
|
.await?;
|
||||||
|
alice.flush_status_updates().await?;
|
||||||
|
let sent2 = alice.pop_sent_msg().await;
|
||||||
|
let sent2_rfc742_mid = Message::load_from_db(&alice, sent2.sender_msg_id)
|
||||||
|
.await?
|
||||||
|
.rfc724_mid;
|
||||||
|
|
||||||
|
// not downloading the status update results in an placeholder
|
||||||
|
receive_imf_inner(
|
||||||
|
&bob,
|
||||||
|
&sent2_rfc742_mid,
|
||||||
|
sent2.payload().as_bytes(),
|
||||||
|
false,
|
||||||
|
Some(sent2.payload().len() as u32),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let msg = bob.get_last_msg().await;
|
||||||
|
let chat_id = msg.chat_id;
|
||||||
|
assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 1);
|
||||||
|
assert_eq!(msg.download_state(), DownloadState::Available);
|
||||||
|
|
||||||
|
// downloading the status update afterwards expands to nothing and moves the placeholder to trash-chat
|
||||||
|
// (usually status updates are too small for not being downloaded directly)
|
||||||
|
receive_imf_inner(
|
||||||
|
&bob,
|
||||||
|
&sent2_rfc742_mid,
|
||||||
|
sent2.payload().as_bytes(),
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 0);
|
||||||
|
assert!(Message::load_from_db(&bob, msg.id)
|
||||||
|
.await?
|
||||||
|
.chat_id
|
||||||
|
.is_trash());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_mdn_expands_to_nothing() -> Result<()> {
|
||||||
|
let bob = TestContext::new_bob().await;
|
||||||
|
let raw = b"Subject: Message opened\n\
|
||||||
|
Date: Mon, 10 Jan 2020 00:00:00 +0000\n\
|
||||||
|
Chat-Version: 1.0\n\
|
||||||
|
Message-ID: <bar@example.org>\n\
|
||||||
|
To: Alice <alice@example.org>\n\
|
||||||
|
From: Bob <bob@example.org>\n\
|
||||||
|
Content-Type: multipart/report; report-type=disposition-notification;\n\t\
|
||||||
|
boundary=\"kJBbU58X1xeWNHgBtTbMk80M5qnV4N\"\n\
|
||||||
|
\n\
|
||||||
|
\n\
|
||||||
|
--kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
|
||||||
|
Content-Type: text/plain; charset=utf-8\n\
|
||||||
|
\n\
|
||||||
|
bla\n\
|
||||||
|
\n\
|
||||||
|
\n\
|
||||||
|
--kJBbU58X1xeWNHgBtTbMk80M5qnV4N\n\
|
||||||
|
Content-Type: message/disposition-notification\n\
|
||||||
|
\n\
|
||||||
|
Reporting-UA: Delta Chat 1.88.0\n\
|
||||||
|
Original-Recipient: rfc822;bob@example.org\n\
|
||||||
|
Final-Recipient: rfc822;bob@example.org\n\
|
||||||
|
Original-Message-ID: <foo@example.org>\n\
|
||||||
|
Disposition: manual-action/MDN-sent-automatically; displayed\n\
|
||||||
|
\n\
|
||||||
|
\n\
|
||||||
|
--kJBbU58X1xeWNHgBtTbMk80M5qnV4N--\n\
|
||||||
|
";
|
||||||
|
|
||||||
|
// not downloading the mdn results in an placeholder
|
||||||
|
receive_imf_inner(
|
||||||
|
&bob,
|
||||||
|
"bar@example.org",
|
||||||
|
raw,
|
||||||
|
false,
|
||||||
|
Some(raw.len() as u32),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let msg = bob.get_last_msg().await;
|
||||||
|
let chat_id = msg.chat_id;
|
||||||
|
assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 1);
|
||||||
|
assert_eq!(msg.download_state(), DownloadState::Available);
|
||||||
|
|
||||||
|
// downloading the mdn afterwards expands to nothing and deletes the placeholder directly
|
||||||
|
// (usually mdn are too small for not being downloaded directly)
|
||||||
|
receive_imf_inner(&bob, "bar@example.org", raw, false, None, false).await?;
|
||||||
|
assert_eq!(get_chat_msgs(&bob, chat_id, 0).await?.len(), 0);
|
||||||
|
assert!(Message::load_from_db(&bob, msg.id)
|
||||||
|
.await?
|
||||||
|
.chat_id
|
||||||
|
.is_trash());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -130,15 +130,14 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
context,
|
context,
|
||||||
"Message already partly in DB, replacing by full message."
|
"Message already partly in DB, replacing by full message."
|
||||||
);
|
);
|
||||||
old_msg_id.delete_from_db(context).await?;
|
Some(old_msg_id)
|
||||||
true
|
|
||||||
} else {
|
} else {
|
||||||
// the message was probably moved around.
|
// the message was probably moved around.
|
||||||
info!(context, "Message already in DB, doing nothing.");
|
info!(context, "Message already in DB, doing nothing.");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
false
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
// the function returns the number of created messages in the database
|
// the function returns the number of created messages in the database
|
||||||
@@ -189,8 +188,9 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
sent_timestamp,
|
sent_timestamp,
|
||||||
rcvd_timestamp,
|
rcvd_timestamp,
|
||||||
from_id,
|
from_id,
|
||||||
seen || replace_partial_download,
|
seen || replace_partial_download.is_some(),
|
||||||
is_partial_download,
|
is_partial_download,
|
||||||
|
replace_partial_download,
|
||||||
fetching_existing_messages,
|
fetching_existing_messages,
|
||||||
prevent_rename,
|
prevent_rename,
|
||||||
)
|
)
|
||||||
@@ -322,7 +322,7 @@ pub(crate) async fn receive_imf_inner(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if replace_partial_download {
|
if replace_partial_download.is_some() {
|
||||||
context.emit_msgs_changed(chat_id, MsgId::new(0));
|
context.emit_msgs_changed(chat_id, MsgId::new(0));
|
||||||
} else if !chat_id.is_trash() {
|
} else if !chat_id.is_trash() {
|
||||||
let fresh = received_msg.state == MessageState::InFresh;
|
let fresh = received_msg.state == MessageState::InFresh;
|
||||||
@@ -401,6 +401,7 @@ async fn add_parts(
|
|||||||
from_id: ContactId,
|
from_id: ContactId,
|
||||||
seen: bool,
|
seen: bool,
|
||||||
is_partial_download: Option<u32>,
|
is_partial_download: Option<u32>,
|
||||||
|
replace_msg_id: Option<MsgId>,
|
||||||
fetching_existing_messages: bool,
|
fetching_existing_messages: bool,
|
||||||
prevent_rename: bool,
|
prevent_rename: bool,
|
||||||
) -> Result<ReceivedMsg> {
|
) -> Result<ReceivedMsg> {
|
||||||
@@ -1145,6 +1146,17 @@ INSERT INTO msgs
|
|||||||
}
|
}
|
||||||
drop(conn);
|
drop(conn);
|
||||||
|
|
||||||
|
if let Some(replace_msg_id) = replace_msg_id {
|
||||||
|
if let Some(created_msg_id) = created_db_entries.pop() {
|
||||||
|
context
|
||||||
|
.merge_messages(created_msg_id, replace_msg_id)
|
||||||
|
.await?;
|
||||||
|
created_db_entries.push(replace_msg_id);
|
||||||
|
} else {
|
||||||
|
replace_msg_id.delete_from_db(context).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
chat_id.unarchive_if_not_muted(context).await?;
|
chat_id.unarchive_if_not_muted(context).await?;
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ use tokio::io::AsyncReadExt;
|
|||||||
use crate::chat::Chat;
|
use crate::chat::Chat;
|
||||||
use crate::contact::ContactId;
|
use crate::contact::ContactId;
|
||||||
use crate::context::Context;
|
use crate::context::Context;
|
||||||
|
use crate::download::DownloadState;
|
||||||
use crate::message::{Message, MessageState, MsgId, Viewtype};
|
use crate::message::{Message, MessageState, MsgId, Viewtype};
|
||||||
use crate::mimeparser::SystemMessage;
|
use crate::mimeparser::SystemMessage;
|
||||||
use crate::param::Param;
|
use crate::param::Param;
|
||||||
@@ -330,10 +331,12 @@ impl Context {
|
|||||||
|
|
||||||
let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?);
|
let status_update_serial = StatusUpdateSerial(u32::try_from(rowid)?);
|
||||||
|
|
||||||
self.emit_event(EventType::WebxdcStatusUpdate {
|
if instance.viewtype == Viewtype::Webxdc {
|
||||||
msg_id: instance.id,
|
self.emit_event(EventType::WebxdcStatusUpdate {
|
||||||
status_update_serial,
|
msg_id: instance.id,
|
||||||
});
|
status_update_serial,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
Ok(status_update_serial)
|
Ok(status_update_serial)
|
||||||
}
|
}
|
||||||
@@ -475,6 +478,8 @@ impl Context {
|
|||||||
} else if let Some(parent) = msg.parent(self).await? {
|
} else if let Some(parent) = msg.parent(self).await? {
|
||||||
if parent.viewtype == Viewtype::Webxdc {
|
if parent.viewtype == Viewtype::Webxdc {
|
||||||
(msg.timestamp_sort, parent, true)
|
(msg.timestamp_sort, parent, true)
|
||||||
|
} else if parent.download_state() != DownloadState::Done {
|
||||||
|
(msg.timestamp_sort, parent, false)
|
||||||
} else {
|
} else {
|
||||||
bail!("receive_status_update: message is not the child of a webxdc message.")
|
bail!("receive_status_update: message is not the child of a webxdc message.")
|
||||||
}
|
}
|
||||||
@@ -730,7 +735,7 @@ mod tests {
|
|||||||
use crate::chatlist::Chatlist;
|
use crate::chatlist::Chatlist;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::contact::Contact;
|
use crate::contact::Contact;
|
||||||
use crate::receive_imf::receive_imf;
|
use crate::receive_imf::{receive_imf, receive_imf_inner};
|
||||||
use crate::test_utils::TestContext;
|
use crate::test_utils::TestContext;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -1038,6 +1043,71 @@ mod tests {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn test_webxdc_update_for_not_downloaded_instance() -> Result<()> {
|
||||||
|
// Alice sends a larger instance and an update
|
||||||
|
let alice = TestContext::new_alice().await;
|
||||||
|
let bob = TestContext::new_bob().await;
|
||||||
|
let chat = alice.create_chat(&bob).await;
|
||||||
|
bob.set_config(Config::DownloadLimit, Some("40000")).await?;
|
||||||
|
let mut alice_instance = create_webxdc_instance(
|
||||||
|
&alice,
|
||||||
|
"chess.xdc",
|
||||||
|
include_bytes!("../test-data/webxdc/chess.xdc"),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let sent1 = alice.send_msg(chat.id, &mut alice_instance).await;
|
||||||
|
let alice_instance = Message::load_from_db(&alice, sent1.sender_msg_id).await?;
|
||||||
|
alice
|
||||||
|
.send_webxdc_status_update(
|
||||||
|
alice_instance.id,
|
||||||
|
r#"{"payload": 7, "summary":"sum", "document":"doc"}"#,
|
||||||
|
"bla",
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
alice.flush_status_updates().await?;
|
||||||
|
let sent2 = alice.pop_sent_msg().await;
|
||||||
|
|
||||||
|
// Bob does not download instance but already receives update
|
||||||
|
receive_imf_inner(
|
||||||
|
&bob,
|
||||||
|
&alice_instance.rfc724_mid,
|
||||||
|
sent1.payload().as_bytes(),
|
||||||
|
false,
|
||||||
|
Some(70790),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let bob_instance = bob.get_last_msg().await;
|
||||||
|
bob_instance.chat_id.accept(&bob).await?;
|
||||||
|
bob.recv_msg(&sent2).await;
|
||||||
|
assert_eq!(bob_instance.download_state, DownloadState::Available);
|
||||||
|
|
||||||
|
// Bob downloads instance, updates should be assigned correctly
|
||||||
|
receive_imf_inner(
|
||||||
|
&bob,
|
||||||
|
&alice_instance.rfc724_mid,
|
||||||
|
sent1.payload().as_bytes(),
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let bob_instance = bob.get_last_msg().await;
|
||||||
|
assert_eq!(bob_instance.viewtype, Viewtype::Webxdc);
|
||||||
|
assert_eq!(bob_instance.download_state, DownloadState::Done);
|
||||||
|
assert_eq!(
|
||||||
|
bob.get_webxdc_status_updates(bob_instance.id, StatusUpdateSerial(0))
|
||||||
|
.await?,
|
||||||
|
r#"[{"payload":7,"document":"doc","summary":"sum","serial":1,"max_serial":1}]"#
|
||||||
|
);
|
||||||
|
let info = bob_instance.get_webxdc_info(&bob).await?;
|
||||||
|
assert_eq!(info.document, "doc");
|
||||||
|
assert_eq!(info.summary, "sum");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn test_delete_webxdc_instance() -> Result<()> {
|
async fn test_delete_webxdc_instance() -> Result<()> {
|
||||||
let t = TestContext::new_alice().await;
|
let t = TestContext::new_alice().await;
|
||||||
|
|||||||
Reference in New Issue
Block a user