Compare commits

...

1 Commits

Author SHA1 Message Date
link2xt
22aa1b7b12 feat: add send_webxdc_replacement() 2023-09-06 20:34:14 +00:00
5 changed files with 315 additions and 9 deletions

View File

@@ -77,6 +77,9 @@ pub enum HeaderDef {
SecureJoinAuth,
Sender,
/// [`Supersedes`](https://www.rfc-editor.org/rfc/rfc4021.html#section-2.1.46) header.
Supersedes,
/// Ephemeral message timer.
EphemeralTimer,
Received,

View File

@@ -435,6 +435,18 @@ pub struct Message {
/// `In-Reply-To` header value.
pub(crate) in_reply_to: Option<String>,
/// `Supersedes` header value.
///
/// It is only used for sending and not stored in the database.
///
/// The header contains `Message-ID` of the original message
/// superseded by this one.
///
/// The header is specified
/// in <https://www.rfc-editor.org/rfc/rfc4021.html#section-2.1.46>.
pub(crate) supersedes: Option<String>,
pub(crate) is_dc_message: MessengerMessage,
pub(crate) mime_modified: bool,
pub(crate) chat_blocked: Blocked,
@@ -530,6 +542,7 @@ impl Message {
download_state: row.get("download_state")?,
error: Some(row.get::<_, String>("error")?)
.filter(|error| !error.is_empty()),
supersedes: None, // `Supersedes` header is only used for sending and not stored in the database.
is_dc_message: row.get("msgrmsg")?,
mime_modified: row.get("mime_modified")?,
text,

View File

@@ -549,6 +549,11 @@ impl<'a> MimeFactory<'a> {
Loaded::Mdn { .. } => create_outgoing_rfc724_mid(None, &self.from_addr),
};
let rfc724_mid_headervalue = render_rfc724_mid(&rfc724_mid);
if let Some(supersedes) = &self.msg.supersedes {
headers
.protected
.push(Header::new("Supersedes".into(), supersedes.to_string()));
}
// Amazon's SMTP servers change the `Message-ID`, just as Outlook's SMTP servers do.
// Outlook's servers add an `X-Microsoft-Original-Message-ID` header with the original `Message-ID`,
@@ -1248,7 +1253,7 @@ impl<'a> MimeFactory<'a> {
} else if command == SystemMessage::WebxdcStatusUpdate {
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
parts.push(context.build_status_update_part(json));
} else if self.msg.viewtype == Viewtype::Webxdc {
} else if self.msg.viewtype == Viewtype::Webxdc && self.msg.supersedes.is_none() {
if let Some(json) = context
.render_webxdc_status_update_object(self.msg.id, None)
.await?

View File

@@ -62,6 +62,10 @@ pub struct ReceivedMsg {
/// Whether IMAP messages should be immediately deleted.
pub needs_delete_job: bool,
/// Message-ID saved into the database.
/// For messages with `Supersedes` header this is the Message-ID of the original message.
pub rfc724_mid: String,
}
/// Emulates reception of a message from the network.
@@ -134,6 +138,7 @@ pub(crate) async fn receive_imf_inner(
sort_timestamp: 0,
msg_ids,
needs_delete_job: false,
rfc724_mid: rfc724_mid.to_string(),
}));
}
Ok(mime_parser) => mime_parser,
@@ -338,12 +343,12 @@ pub(crate) async fn receive_imf_inner(
.sql
.execute(
"UPDATE imap SET target=? WHERE rfc724_mid=?",
(target, rfc724_mid),
(target, &received_msg.rfc724_mid),
)
.await?;
} else if !mime_parser.mdn_reports.is_empty() && mime_parser.has_chat_version() {
// This is a Delta Chat MDN. Mark as read.
markseen_on_imap_table(context, rfc724_mid).await?;
markseen_on_imap_table(context, &received_msg.rfc724_mid).await?;
}
}
@@ -1050,6 +1055,43 @@ async fn add_parts(
.cloned()
.unwrap_or_default();
let rfc724_mid = if let Some(supersedes) = mime_parser.get_header(HeaderDef::Supersedes) {
supersedes.to_string()
} else {
rfc724_mid.to_string()
};
let supersedes_msg_id = match mime_parser.get_header(HeaderDef::Supersedes) {
Some(supersedes) => {
if let Some(msg_id) = rfc724_mid_exists(context, supersedes).await? {
if let Some(orig_from_id) = context
.sql
.query_row_optional("SELECT from_id FROM msgs WHERE id=?", (msg_id,), |row| {
let from_id: ContactId = row.get(0)?;
Ok(from_id)
})
.await?
{
if from_id == orig_from_id {
Some(msg_id)
} else {
None
}
} else {
None
}
} else {
None
}
}
None => None,
};
if supersedes_msg_id.is_some() {
replace_msg_id = supersedes_msg_id;
info!(context, "Superseding {supersedes_msg_id:?}");
}
// fine, so far. now, split the message into simple parts usable as "short messages"
// and add them to the database (mails sent by other messenger clients should result
// into only one message; mails sent by other clients may result in several messages
@@ -1301,6 +1343,7 @@ RETURNING id
sort_timestamp,
msg_ids: created_db_entries,
needs_delete_job,
rfc724_mid,
})
}

View File

@@ -26,7 +26,8 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::io::AsyncReadExt;
use crate::chat::Chat;
use crate::blob::BlobObject;
use crate::chat::{self, create_send_msg_job, Chat};
use crate::constants::Chattype;
use crate::contact::ContactId;
use crate::context::Context;
@@ -34,12 +35,12 @@ use crate::download::DownloadState;
use crate::message::{Message, MessageState, MsgId, Viewtype};
use crate::mimefactory::wrapped_base64_encode;
use crate::mimeparser::SystemMessage;
use crate::param::Param;
use crate::param::Params;
use crate::param::{Param, Params};
use crate::scheduler::InterruptInfo;
use crate::tools::strip_rtlo_characters;
use crate::tools::{create_smeared_timestamp, get_abs_path};
use crate::{chat, EventType};
use crate::tools::{
create_outgoing_rfc724_mid, create_smeared_timestamp, get_abs_path, strip_rtlo_characters,
};
use crate::EventType;
/// The current API version.
/// If `min_api` in manifest.toml is set to a larger value,
@@ -845,6 +846,77 @@ impl Message {
}
}
/// Sends a replacement for an own WebXDC message.
pub async fn send_webxdc_replacement(
context: &Context,
msg_id: MsgId,
filename: &str,
) -> Result<()> {
let mut msg = Message::load_from_db(context, msg_id).await?;
ensure!(
msg.from_id == ContactId::SELF,
"Can update WebXDC only in own messages"
);
ensure!(
msg.get_viewtype() == Viewtype::Webxdc,
"Message {msg_id} is not a WebXDC instance"
);
let state = msg.get_state();
match state {
MessageState::OutFailed | MessageState::OutDelivered | MessageState::OutMdnRcvd => {}
MessageState::Undefined
| MessageState::InFresh
| MessageState::InNoticed
| MessageState::InSeen
| MessageState::OutPreparing
| MessageState::OutPending
| MessageState::OutDraft => bail!("Unexpected message state: {state}"),
}
let chat = Chat::load_from_db(context, msg.chat_id).await?;
let mut param = msg.param.clone();
if !chat.is_protected() {
param.remove(Param::GuaranteeE2ee);
}
let blob = BlobObject::new_from_path(context, Path::new(filename))
.await
.context("Failed to create webxdc replacement blob")?;
param.set(Param::File, blob.as_name());
msg.param = param;
// Generate new Message-ID.
context
.sql
.execute(
"UPDATE msgs
SET state=?, param=?
WHERE id=?",
(MessageState::OutPending, msg.param.to_string(), msg_id),
)
.await?;
msg.supersedes = Some(msg.rfc724_mid);
msg.rfc724_mid = {
let grpid = match chat.typ {
Chattype::Group => Some(chat.grpid.as_str()),
_ => None,
};
let from = context.get_primary_self_addr().await?;
create_outgoing_rfc724_mid(grpid, &from)
};
if create_send_msg_job(context, &mut msg).await?.is_some() {
context
.scheduler
.interrupt_smtp(InterruptInfo::new(false))
.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use serde_json::json;
@@ -2623,4 +2695,174 @@ sth_for_the = "future""#
Ok(())
}
/// Tests sending webxdc and replacing it with a newer version.
///
/// Updates should be preserved after upgrading.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_replace_webxdc() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
// Alice sends WebXDC instance.
let alice_chat = alice.create_chat(&bob).await;
let mut alice_instance = create_webxdc_instance(
&alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
)
.await?;
alice_instance.set_text("user added text".to_string());
send_msg(&alice, alice_chat.id, &mut alice_instance).await?;
let alice_instance = alice.get_last_msg().await;
assert_eq!(alice_instance.get_text(), "user added text");
let original_rfc724_mid = alice_instance.rfc724_mid;
// Bob receives that instance.
let alice_sent_instance = alice.pop_sent_msg().await;
let bob_received_instance = bob.recv_msg(&alice_sent_instance).await;
assert_eq!(bob_received_instance.get_text(), "user added text");
// Alice sends WebXDC update.
alice
.send_webxdc_status_update(alice_instance.id, r#"{"payload": 1}"#, "Alice update")
.await?;
alice.flush_status_updates().await?;
let alice_sent_update = alice.pop_sent_msg().await;
bob.recv_msg(&alice_sent_update).await;
assert_eq!(
bob.get_webxdc_status_updates(bob_received_instance.id, StatusUpdateSerial(0))
.await?,
r#"[{"payload":1,"serial":1,"max_serial":1}]"#
);
// Alice sends WebXDC instance replacement.
send_webxdc_replacement(
&alice,
alice_instance.id,
"test-data/webxdc/with-minimal-manifest.xdc",
)
.await
.context("Failed to send WebXDC replacement")?;
let alice_replacement_instance = alice.get_last_msg().await;
let alice_replacement_info = alice_replacement_instance.get_webxdc_info(&alice).await?;
assert_eq!(alice_replacement_info.name, "nice app!");
assert_eq!(alice_instance.id, alice_replacement_instance.id);
let alice_sent_replacement_instance = alice.pop_sent_msg().await;
assert!(alice_sent_replacement_instance
.payload
.contains(&format!("Supersedes: {original_rfc724_mid}")));
assert_eq!(alice_replacement_instance.rfc724_mid, original_rfc724_mid);
// Bob receives WebXDC instance replacement.
let bob_received_replacement_instance =
bob.recv_msg(&alice_sent_replacement_instance).await;
assert_eq!(
bob_received_instance.id,
bob_received_replacement_instance.id
);
assert_eq!(
bob_received_replacement_instance.rfc724_mid,
original_rfc724_mid
);
let bob_received_replacement_info = bob_received_replacement_instance
.get_webxdc_info(&bob)
.await?;
assert_eq!(bob_received_replacement_info.name, "nice app!");
// Updates are not modified.
assert_eq!(
bob.get_webxdc_status_updates(
bob_received_replacement_instance.id,
StatusUpdateSerial(0)
)
.await?,
r#"[{"payload":1,"serial":1,"max_serial":1}]"#
);
// Bob is not allowed to replace the instance.
assert!(send_webxdc_replacement(
&bob,
bob_received_instance.id,
"test-data/webxdc/minimal.xdc"
)
.await
.is_err());
// Alice sends a second WebXDC instance replacement.
send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc")
.await
.context("Failed to send second WebXDC replacement")?;
let alice_second_sent_replacement_instance = alice.pop_sent_msg().await;
let bob_received_second_replacement_instance =
bob.recv_msg(&alice_second_sent_replacement_instance).await;
assert_eq!(
bob_received_instance.id,
bob_received_second_replacement_instance.id
);
assert_eq!(
bob_received_second_replacement_instance.rfc724_mid,
original_rfc724_mid
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_replace_webxdc_missing_original() -> Result<()> {
let alice = TestContext::new_alice().await;
let bob = TestContext::new_bob().await;
// Alice sends WebXDC instance.
let alice_chat = alice.create_chat(&bob).await;
let mut alice_instance = create_webxdc_instance(
&alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
)
.await?;
alice_instance.set_text("user added text".to_string());
send_msg(&alice, alice_chat.id, &mut alice_instance).await?;
alice.pop_sent_msg().await;
let alice_instance = alice.get_last_msg().await;
assert_eq!(alice_instance.get_text(), "user added text");
let original_rfc724_mid = alice_instance.rfc724_mid;
// Bob missed the original instance message.
// Alice sends WebXDC instance replacement.
send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc")
.await
.context("Failed to send WebXDC replacement")?;
let alice_sent_replacement_instance = alice.pop_sent_msg().await;
assert!(alice_sent_replacement_instance
.payload
.contains(&format!("Supersedes: {original_rfc724_mid}")));
// Bob receives WebXDC instance replacement.
let bob_received_replacement_instance =
bob.recv_msg(&alice_sent_replacement_instance).await;
assert_eq!(
bob_received_replacement_instance.rfc724_mid,
original_rfc724_mid
);
// Alice sends a second WebXDC instance replacement.
send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc")
.await
.context("Failed to send second WebXDC replacement")?;
let alice_second_sent_replacement_instance = alice.pop_sent_msg().await;
let bob_received_second_replacement_instance =
bob.recv_msg(&alice_second_sent_replacement_instance).await;
assert_eq!(
bob_received_replacement_instance.id,
bob_received_second_replacement_instance.id
);
assert_eq!(
bob_received_second_replacement_instance.rfc724_mid,
original_rfc724_mid
);
Ok(())
}
}