diff --git a/src/headerdef.rs b/src/headerdef.rs index ca282a81b..e48badcc4 100644 --- a/src/headerdef.rs +++ b/src/headerdef.rs @@ -77,6 +77,9 @@ pub enum HeaderDef { SecureJoinAuth, Sender, + /// [`Supersedes`](https://www.rfc-editor.org/rfc/rfc4021.html#section-2.1.46) header. + Supersedes, + /// Ephemeral message timer. EphemeralTimer, Received, diff --git a/src/message.rs b/src/message.rs index 0a299c3e9..0a2c42025 100644 --- a/src/message.rs +++ b/src/message.rs @@ -435,6 +435,18 @@ pub struct Message { /// `In-Reply-To` header value. pub(crate) in_reply_to: Option, + + /// `Supersedes` header value. + /// + /// It is only used for sending and not stored in the database. + /// + /// The header contains `Message-ID` of the original message + /// superseded by this one. + /// + /// The header is specified + /// in . + pub(crate) supersedes: Option, + pub(crate) is_dc_message: MessengerMessage, pub(crate) mime_modified: bool, pub(crate) chat_blocked: Blocked, @@ -530,6 +542,7 @@ impl Message { download_state: row.get("download_state")?, error: Some(row.get::<_, String>("error")?) .filter(|error| !error.is_empty()), + supersedes: None, // `Supersedes` header is only used for sending and not stored in the database. is_dc_message: row.get("msgrmsg")?, mime_modified: row.get("mime_modified")?, text, diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 724340348..6c0394311 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -549,6 +549,11 @@ impl<'a> MimeFactory<'a> { Loaded::Mdn { .. } => create_outgoing_rfc724_mid(None, &self.from_addr), }; let rfc724_mid_headervalue = render_rfc724_mid(&rfc724_mid); + if let Some(supersedes) = &self.msg.supersedes { + headers + .protected + .push(Header::new("Supersedes".into(), supersedes.to_string())); + } // Amazon's SMTP servers change the `Message-ID`, just as Outlook's SMTP servers do. // Outlook's servers add an `X-Microsoft-Original-Message-ID` header with the original `Message-ID`, @@ -1248,7 +1253,7 @@ impl<'a> MimeFactory<'a> { } else if command == SystemMessage::WebxdcStatusUpdate { let json = self.msg.param.get(Param::Arg).unwrap_or_default(); parts.push(context.build_status_update_part(json)); - } else if self.msg.viewtype == Viewtype::Webxdc { + } else if self.msg.viewtype == Viewtype::Webxdc && self.msg.supersedes.is_none() { if let Some(json) = context .render_webxdc_status_update_object(self.msg.id, None) .await? diff --git a/src/receive_imf.rs b/src/receive_imf.rs index b575529d1..a5bbc4aaa 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -62,6 +62,10 @@ pub struct ReceivedMsg { /// Whether IMAP messages should be immediately deleted. pub needs_delete_job: bool, + + /// Message-ID saved into the database. + /// For messages with `Supersedes` header this is the Message-ID of the original message. + pub rfc724_mid: String, } /// Emulates reception of a message from the network. @@ -134,6 +138,7 @@ pub(crate) async fn receive_imf_inner( sort_timestamp: 0, msg_ids, needs_delete_job: false, + rfc724_mid: rfc724_mid.to_string(), })); } Ok(mime_parser) => mime_parser, @@ -338,12 +343,12 @@ pub(crate) async fn receive_imf_inner( .sql .execute( "UPDATE imap SET target=? WHERE rfc724_mid=?", - (target, rfc724_mid), + (target, &received_msg.rfc724_mid), ) .await?; } else if !mime_parser.mdn_reports.is_empty() && mime_parser.has_chat_version() { // This is a Delta Chat MDN. Mark as read. - markseen_on_imap_table(context, rfc724_mid).await?; + markseen_on_imap_table(context, &received_msg.rfc724_mid).await?; } } @@ -1050,6 +1055,43 @@ async fn add_parts( .cloned() .unwrap_or_default(); + let rfc724_mid = if let Some(supersedes) = mime_parser.get_header(HeaderDef::Supersedes) { + supersedes.to_string() + } else { + rfc724_mid.to_string() + }; + + let supersedes_msg_id = match mime_parser.get_header(HeaderDef::Supersedes) { + Some(supersedes) => { + if let Some(msg_id) = rfc724_mid_exists(context, supersedes).await? { + if let Some(orig_from_id) = context + .sql + .query_row_optional("SELECT from_id FROM msgs WHERE id=?", (msg_id,), |row| { + let from_id: ContactId = row.get(0)?; + + Ok(from_id) + }) + .await? + { + if from_id == orig_from_id { + Some(msg_id) + } else { + None + } + } else { + None + } + } else { + None + } + } + None => None, + }; + if supersedes_msg_id.is_some() { + replace_msg_id = supersedes_msg_id; + info!(context, "Superseding {supersedes_msg_id:?}"); + } + // fine, so far. now, split the message into simple parts usable as "short messages" // and add them to the database (mails sent by other messenger clients should result // into only one message; mails sent by other clients may result in several messages @@ -1301,6 +1343,7 @@ RETURNING id sort_timestamp, msg_ids: created_db_entries, needs_delete_job, + rfc724_mid, }) } diff --git a/src/webxdc.rs b/src/webxdc.rs index 4697f025b..af3604a89 100644 --- a/src/webxdc.rs +++ b/src/webxdc.rs @@ -26,7 +26,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::io::AsyncReadExt; -use crate::chat::Chat; +use crate::blob::BlobObject; +use crate::chat::{self, create_send_msg_job, Chat}; use crate::constants::Chattype; use crate::contact::ContactId; use crate::context::Context; @@ -34,12 +35,12 @@ use crate::download::DownloadState; use crate::message::{Message, MessageState, MsgId, Viewtype}; use crate::mimefactory::wrapped_base64_encode; use crate::mimeparser::SystemMessage; -use crate::param::Param; -use crate::param::Params; +use crate::param::{Param, Params}; use crate::scheduler::InterruptInfo; -use crate::tools::strip_rtlo_characters; -use crate::tools::{create_smeared_timestamp, get_abs_path}; -use crate::{chat, EventType}; +use crate::tools::{ + create_outgoing_rfc724_mid, create_smeared_timestamp, get_abs_path, strip_rtlo_characters, +}; +use crate::EventType; /// The current API version. /// If `min_api` in manifest.toml is set to a larger value, @@ -845,6 +846,77 @@ impl Message { } } +/// Sends a replacement for an own WebXDC message. +pub async fn send_webxdc_replacement( + context: &Context, + msg_id: MsgId, + filename: &str, +) -> Result<()> { + let mut msg = Message::load_from_db(context, msg_id).await?; + + ensure!( + msg.from_id == ContactId::SELF, + "Can update WebXDC only in own messages" + ); + ensure!( + msg.get_viewtype() == Viewtype::Webxdc, + "Message {msg_id} is not a WebXDC instance" + ); + let state = msg.get_state(); + match state { + MessageState::OutFailed | MessageState::OutDelivered | MessageState::OutMdnRcvd => {} + MessageState::Undefined + | MessageState::InFresh + | MessageState::InNoticed + | MessageState::InSeen + | MessageState::OutPreparing + | MessageState::OutPending + | MessageState::OutDraft => bail!("Unexpected message state: {state}"), + } + + let chat = Chat::load_from_db(context, msg.chat_id).await?; + + let mut param = msg.param.clone(); + if !chat.is_protected() { + param.remove(Param::GuaranteeE2ee); + } + let blob = BlobObject::new_from_path(context, Path::new(filename)) + .await + .context("Failed to create webxdc replacement blob")?; + param.set(Param::File, blob.as_name()); + msg.param = param; + + // Generate new Message-ID. + context + .sql + .execute( + "UPDATE msgs + SET state=?, param=? + WHERE id=?", + (MessageState::OutPending, msg.param.to_string(), msg_id), + ) + .await?; + + msg.supersedes = Some(msg.rfc724_mid); + msg.rfc724_mid = { + let grpid = match chat.typ { + Chattype::Group => Some(chat.grpid.as_str()), + _ => None, + }; + let from = context.get_primary_self_addr().await?; + create_outgoing_rfc724_mid(grpid, &from) + }; + + if create_send_msg_job(context, &mut msg).await?.is_some() { + context + .scheduler + .interrupt_smtp(InterruptInfo::new(false)) + .await; + } + + Ok(()) +} + #[cfg(test)] mod tests { use serde_json::json; @@ -2623,4 +2695,174 @@ sth_for_the = "future""# Ok(()) } + + /// Tests sending webxdc and replacing it with a newer version. + /// + /// Updates should be preserved after upgrading. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_replace_webxdc() -> Result<()> { + let alice = TestContext::new_alice().await; + let bob = TestContext::new_bob().await; + + // Alice sends WebXDC instance. + let alice_chat = alice.create_chat(&bob).await; + let mut alice_instance = create_webxdc_instance( + &alice, + "minimal.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + ) + .await?; + alice_instance.set_text("user added text".to_string()); + send_msg(&alice, alice_chat.id, &mut alice_instance).await?; + let alice_instance = alice.get_last_msg().await; + assert_eq!(alice_instance.get_text(), "user added text"); + let original_rfc724_mid = alice_instance.rfc724_mid; + + // Bob receives that instance. + let alice_sent_instance = alice.pop_sent_msg().await; + let bob_received_instance = bob.recv_msg(&alice_sent_instance).await; + assert_eq!(bob_received_instance.get_text(), "user added text"); + + // Alice sends WebXDC update. + alice + .send_webxdc_status_update(alice_instance.id, r#"{"payload": 1}"#, "Alice update") + .await?; + alice.flush_status_updates().await?; + let alice_sent_update = alice.pop_sent_msg().await; + bob.recv_msg(&alice_sent_update).await; + assert_eq!( + bob.get_webxdc_status_updates(bob_received_instance.id, StatusUpdateSerial(0)) + .await?, + r#"[{"payload":1,"serial":1,"max_serial":1}]"# + ); + + // Alice sends WebXDC instance replacement. + send_webxdc_replacement( + &alice, + alice_instance.id, + "test-data/webxdc/with-minimal-manifest.xdc", + ) + .await + .context("Failed to send WebXDC replacement")?; + let alice_replacement_instance = alice.get_last_msg().await; + let alice_replacement_info = alice_replacement_instance.get_webxdc_info(&alice).await?; + assert_eq!(alice_replacement_info.name, "nice app!"); + assert_eq!(alice_instance.id, alice_replacement_instance.id); + let alice_sent_replacement_instance = alice.pop_sent_msg().await; + assert!(alice_sent_replacement_instance + .payload + .contains(&format!("Supersedes: {original_rfc724_mid}"))); + assert_eq!(alice_replacement_instance.rfc724_mid, original_rfc724_mid); + + // Bob receives WebXDC instance replacement. + let bob_received_replacement_instance = + bob.recv_msg(&alice_sent_replacement_instance).await; + assert_eq!( + bob_received_instance.id, + bob_received_replacement_instance.id + ); + assert_eq!( + bob_received_replacement_instance.rfc724_mid, + original_rfc724_mid + ); + let bob_received_replacement_info = bob_received_replacement_instance + .get_webxdc_info(&bob) + .await?; + assert_eq!(bob_received_replacement_info.name, "nice app!"); + + // Updates are not modified. + assert_eq!( + bob.get_webxdc_status_updates( + bob_received_replacement_instance.id, + StatusUpdateSerial(0) + ) + .await?, + r#"[{"payload":1,"serial":1,"max_serial":1}]"# + ); + + // Bob is not allowed to replace the instance. + assert!(send_webxdc_replacement( + &bob, + bob_received_instance.id, + "test-data/webxdc/minimal.xdc" + ) + .await + .is_err()); + + // Alice sends a second WebXDC instance replacement. + send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc") + .await + .context("Failed to send second WebXDC replacement")?; + let alice_second_sent_replacement_instance = alice.pop_sent_msg().await; + let bob_received_second_replacement_instance = + bob.recv_msg(&alice_second_sent_replacement_instance).await; + assert_eq!( + bob_received_instance.id, + bob_received_second_replacement_instance.id + ); + assert_eq!( + bob_received_second_replacement_instance.rfc724_mid, + original_rfc724_mid + ); + + Ok(()) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn test_replace_webxdc_missing_original() -> Result<()> { + let alice = TestContext::new_alice().await; + let bob = TestContext::new_bob().await; + + // Alice sends WebXDC instance. + let alice_chat = alice.create_chat(&bob).await; + let mut alice_instance = create_webxdc_instance( + &alice, + "minimal.xdc", + include_bytes!("../test-data/webxdc/minimal.xdc"), + ) + .await?; + alice_instance.set_text("user added text".to_string()); + send_msg(&alice, alice_chat.id, &mut alice_instance).await?; + alice.pop_sent_msg().await; + let alice_instance = alice.get_last_msg().await; + assert_eq!(alice_instance.get_text(), "user added text"); + let original_rfc724_mid = alice_instance.rfc724_mid; + + // Bob missed the original instance message. + + // Alice sends WebXDC instance replacement. + send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc") + .await + .context("Failed to send WebXDC replacement")?; + let alice_sent_replacement_instance = alice.pop_sent_msg().await; + assert!(alice_sent_replacement_instance + .payload + .contains(&format!("Supersedes: {original_rfc724_mid}"))); + + // Bob receives WebXDC instance replacement. + let bob_received_replacement_instance = + bob.recv_msg(&alice_sent_replacement_instance).await; + assert_eq!( + bob_received_replacement_instance.rfc724_mid, + original_rfc724_mid + ); + + // Alice sends a second WebXDC instance replacement. + send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc") + .await + .context("Failed to send second WebXDC replacement")?; + let alice_second_sent_replacement_instance = alice.pop_sent_msg().await; + let bob_received_second_replacement_instance = + bob.recv_msg(&alice_second_sent_replacement_instance).await; + assert_eq!( + bob_received_replacement_instance.id, + bob_received_second_replacement_instance.id + ); + assert_eq!( + bob_received_second_replacement_instance.rfc724_mid, + original_rfc724_mid + ); + + Ok(()) + } }