mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
1 Commits
v1.148.3
...
link2xt/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
22aa1b7b12 |
@@ -77,6 +77,9 @@ pub enum HeaderDef {
|
||||
SecureJoinAuth,
|
||||
Sender,
|
||||
|
||||
/// [`Supersedes`](https://www.rfc-editor.org/rfc/rfc4021.html#section-2.1.46) header.
|
||||
Supersedes,
|
||||
|
||||
/// Ephemeral message timer.
|
||||
EphemeralTimer,
|
||||
Received,
|
||||
|
||||
@@ -435,6 +435,18 @@ pub struct Message {
|
||||
|
||||
/// `In-Reply-To` header value.
|
||||
pub(crate) in_reply_to: Option<String>,
|
||||
|
||||
/// `Supersedes` header value.
|
||||
///
|
||||
/// It is only used for sending and not stored in the database.
|
||||
///
|
||||
/// The header contains `Message-ID` of the original message
|
||||
/// superseded by this one.
|
||||
///
|
||||
/// The header is specified
|
||||
/// in <https://www.rfc-editor.org/rfc/rfc4021.html#section-2.1.46>.
|
||||
pub(crate) supersedes: Option<String>,
|
||||
|
||||
pub(crate) is_dc_message: MessengerMessage,
|
||||
pub(crate) mime_modified: bool,
|
||||
pub(crate) chat_blocked: Blocked,
|
||||
@@ -530,6 +542,7 @@ impl Message {
|
||||
download_state: row.get("download_state")?,
|
||||
error: Some(row.get::<_, String>("error")?)
|
||||
.filter(|error| !error.is_empty()),
|
||||
supersedes: None, // `Supersedes` header is only used for sending and not stored in the database.
|
||||
is_dc_message: row.get("msgrmsg")?,
|
||||
mime_modified: row.get("mime_modified")?,
|
||||
text,
|
||||
|
||||
@@ -549,6 +549,11 @@ impl<'a> MimeFactory<'a> {
|
||||
Loaded::Mdn { .. } => create_outgoing_rfc724_mid(None, &self.from_addr),
|
||||
};
|
||||
let rfc724_mid_headervalue = render_rfc724_mid(&rfc724_mid);
|
||||
if let Some(supersedes) = &self.msg.supersedes {
|
||||
headers
|
||||
.protected
|
||||
.push(Header::new("Supersedes".into(), supersedes.to_string()));
|
||||
}
|
||||
|
||||
// Amazon's SMTP servers change the `Message-ID`, just as Outlook's SMTP servers do.
|
||||
// Outlook's servers add an `X-Microsoft-Original-Message-ID` header with the original `Message-ID`,
|
||||
@@ -1248,7 +1253,7 @@ impl<'a> MimeFactory<'a> {
|
||||
} else if command == SystemMessage::WebxdcStatusUpdate {
|
||||
let json = self.msg.param.get(Param::Arg).unwrap_or_default();
|
||||
parts.push(context.build_status_update_part(json));
|
||||
} else if self.msg.viewtype == Viewtype::Webxdc {
|
||||
} else if self.msg.viewtype == Viewtype::Webxdc && self.msg.supersedes.is_none() {
|
||||
if let Some(json) = context
|
||||
.render_webxdc_status_update_object(self.msg.id, None)
|
||||
.await?
|
||||
|
||||
@@ -62,6 +62,10 @@ pub struct ReceivedMsg {
|
||||
|
||||
/// Whether IMAP messages should be immediately deleted.
|
||||
pub needs_delete_job: bool,
|
||||
|
||||
/// Message-ID saved into the database.
|
||||
/// For messages with `Supersedes` header this is the Message-ID of the original message.
|
||||
pub rfc724_mid: String,
|
||||
}
|
||||
|
||||
/// Emulates reception of a message from the network.
|
||||
@@ -134,6 +138,7 @@ pub(crate) async fn receive_imf_inner(
|
||||
sort_timestamp: 0,
|
||||
msg_ids,
|
||||
needs_delete_job: false,
|
||||
rfc724_mid: rfc724_mid.to_string(),
|
||||
}));
|
||||
}
|
||||
Ok(mime_parser) => mime_parser,
|
||||
@@ -338,12 +343,12 @@ pub(crate) async fn receive_imf_inner(
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE imap SET target=? WHERE rfc724_mid=?",
|
||||
(target, rfc724_mid),
|
||||
(target, &received_msg.rfc724_mid),
|
||||
)
|
||||
.await?;
|
||||
} else if !mime_parser.mdn_reports.is_empty() && mime_parser.has_chat_version() {
|
||||
// This is a Delta Chat MDN. Mark as read.
|
||||
markseen_on_imap_table(context, rfc724_mid).await?;
|
||||
markseen_on_imap_table(context, &received_msg.rfc724_mid).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1050,6 +1055,43 @@ async fn add_parts(
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let rfc724_mid = if let Some(supersedes) = mime_parser.get_header(HeaderDef::Supersedes) {
|
||||
supersedes.to_string()
|
||||
} else {
|
||||
rfc724_mid.to_string()
|
||||
};
|
||||
|
||||
let supersedes_msg_id = match mime_parser.get_header(HeaderDef::Supersedes) {
|
||||
Some(supersedes) => {
|
||||
if let Some(msg_id) = rfc724_mid_exists(context, supersedes).await? {
|
||||
if let Some(orig_from_id) = context
|
||||
.sql
|
||||
.query_row_optional("SELECT from_id FROM msgs WHERE id=?", (msg_id,), |row| {
|
||||
let from_id: ContactId = row.get(0)?;
|
||||
|
||||
Ok(from_id)
|
||||
})
|
||||
.await?
|
||||
{
|
||||
if from_id == orig_from_id {
|
||||
Some(msg_id)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
if supersedes_msg_id.is_some() {
|
||||
replace_msg_id = supersedes_msg_id;
|
||||
info!(context, "Superseding {supersedes_msg_id:?}");
|
||||
}
|
||||
|
||||
// fine, so far. now, split the message into simple parts usable as "short messages"
|
||||
// and add them to the database (mails sent by other messenger clients should result
|
||||
// into only one message; mails sent by other clients may result in several messages
|
||||
@@ -1301,6 +1343,7 @@ RETURNING id
|
||||
sort_timestamp,
|
||||
msg_ids: created_db_entries,
|
||||
needs_delete_job,
|
||||
rfc724_mid,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
254
src/webxdc.rs
254
src/webxdc.rs
@@ -26,7 +26,8 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use crate::chat::Chat;
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chat::{self, create_send_msg_job, Chat};
|
||||
use crate::constants::Chattype;
|
||||
use crate::contact::ContactId;
|
||||
use crate::context::Context;
|
||||
@@ -34,12 +35,12 @@ use crate::download::DownloadState;
|
||||
use crate::message::{Message, MessageState, MsgId, Viewtype};
|
||||
use crate::mimefactory::wrapped_base64_encode;
|
||||
use crate::mimeparser::SystemMessage;
|
||||
use crate::param::Param;
|
||||
use crate::param::Params;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::scheduler::InterruptInfo;
|
||||
use crate::tools::strip_rtlo_characters;
|
||||
use crate::tools::{create_smeared_timestamp, get_abs_path};
|
||||
use crate::{chat, EventType};
|
||||
use crate::tools::{
|
||||
create_outgoing_rfc724_mid, create_smeared_timestamp, get_abs_path, strip_rtlo_characters,
|
||||
};
|
||||
use crate::EventType;
|
||||
|
||||
/// The current API version.
|
||||
/// If `min_api` in manifest.toml is set to a larger value,
|
||||
@@ -845,6 +846,77 @@ impl Message {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a replacement for an own WebXDC message.
|
||||
pub async fn send_webxdc_replacement(
|
||||
context: &Context,
|
||||
msg_id: MsgId,
|
||||
filename: &str,
|
||||
) -> Result<()> {
|
||||
let mut msg = Message::load_from_db(context, msg_id).await?;
|
||||
|
||||
ensure!(
|
||||
msg.from_id == ContactId::SELF,
|
||||
"Can update WebXDC only in own messages"
|
||||
);
|
||||
ensure!(
|
||||
msg.get_viewtype() == Viewtype::Webxdc,
|
||||
"Message {msg_id} is not a WebXDC instance"
|
||||
);
|
||||
let state = msg.get_state();
|
||||
match state {
|
||||
MessageState::OutFailed | MessageState::OutDelivered | MessageState::OutMdnRcvd => {}
|
||||
MessageState::Undefined
|
||||
| MessageState::InFresh
|
||||
| MessageState::InNoticed
|
||||
| MessageState::InSeen
|
||||
| MessageState::OutPreparing
|
||||
| MessageState::OutPending
|
||||
| MessageState::OutDraft => bail!("Unexpected message state: {state}"),
|
||||
}
|
||||
|
||||
let chat = Chat::load_from_db(context, msg.chat_id).await?;
|
||||
|
||||
let mut param = msg.param.clone();
|
||||
if !chat.is_protected() {
|
||||
param.remove(Param::GuaranteeE2ee);
|
||||
}
|
||||
let blob = BlobObject::new_from_path(context, Path::new(filename))
|
||||
.await
|
||||
.context("Failed to create webxdc replacement blob")?;
|
||||
param.set(Param::File, blob.as_name());
|
||||
msg.param = param;
|
||||
|
||||
// Generate new Message-ID.
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE msgs
|
||||
SET state=?, param=?
|
||||
WHERE id=?",
|
||||
(MessageState::OutPending, msg.param.to_string(), msg_id),
|
||||
)
|
||||
.await?;
|
||||
|
||||
msg.supersedes = Some(msg.rfc724_mid);
|
||||
msg.rfc724_mid = {
|
||||
let grpid = match chat.typ {
|
||||
Chattype::Group => Some(chat.grpid.as_str()),
|
||||
_ => None,
|
||||
};
|
||||
let from = context.get_primary_self_addr().await?;
|
||||
create_outgoing_rfc724_mid(grpid, &from)
|
||||
};
|
||||
|
||||
if create_send_msg_job(context, &mut msg).await?.is_some() {
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serde_json::json;
|
||||
@@ -2623,4 +2695,174 @@ sth_for_the = "future""#
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tests sending webxdc and replacing it with a newer version.
|
||||
///
|
||||
/// Updates should be preserved after upgrading.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_replace_webxdc() -> Result<()> {
|
||||
let alice = TestContext::new_alice().await;
|
||||
let bob = TestContext::new_bob().await;
|
||||
|
||||
// Alice sends WebXDC instance.
|
||||
let alice_chat = alice.create_chat(&bob).await;
|
||||
let mut alice_instance = create_webxdc_instance(
|
||||
&alice,
|
||||
"minimal.xdc",
|
||||
include_bytes!("../test-data/webxdc/minimal.xdc"),
|
||||
)
|
||||
.await?;
|
||||
alice_instance.set_text("user added text".to_string());
|
||||
send_msg(&alice, alice_chat.id, &mut alice_instance).await?;
|
||||
let alice_instance = alice.get_last_msg().await;
|
||||
assert_eq!(alice_instance.get_text(), "user added text");
|
||||
let original_rfc724_mid = alice_instance.rfc724_mid;
|
||||
|
||||
// Bob receives that instance.
|
||||
let alice_sent_instance = alice.pop_sent_msg().await;
|
||||
let bob_received_instance = bob.recv_msg(&alice_sent_instance).await;
|
||||
assert_eq!(bob_received_instance.get_text(), "user added text");
|
||||
|
||||
// Alice sends WebXDC update.
|
||||
alice
|
||||
.send_webxdc_status_update(alice_instance.id, r#"{"payload": 1}"#, "Alice update")
|
||||
.await?;
|
||||
alice.flush_status_updates().await?;
|
||||
let alice_sent_update = alice.pop_sent_msg().await;
|
||||
bob.recv_msg(&alice_sent_update).await;
|
||||
assert_eq!(
|
||||
bob.get_webxdc_status_updates(bob_received_instance.id, StatusUpdateSerial(0))
|
||||
.await?,
|
||||
r#"[{"payload":1,"serial":1,"max_serial":1}]"#
|
||||
);
|
||||
|
||||
// Alice sends WebXDC instance replacement.
|
||||
send_webxdc_replacement(
|
||||
&alice,
|
||||
alice_instance.id,
|
||||
"test-data/webxdc/with-minimal-manifest.xdc",
|
||||
)
|
||||
.await
|
||||
.context("Failed to send WebXDC replacement")?;
|
||||
let alice_replacement_instance = alice.get_last_msg().await;
|
||||
let alice_replacement_info = alice_replacement_instance.get_webxdc_info(&alice).await?;
|
||||
assert_eq!(alice_replacement_info.name, "nice app!");
|
||||
assert_eq!(alice_instance.id, alice_replacement_instance.id);
|
||||
let alice_sent_replacement_instance = alice.pop_sent_msg().await;
|
||||
assert!(alice_sent_replacement_instance
|
||||
.payload
|
||||
.contains(&format!("Supersedes: {original_rfc724_mid}")));
|
||||
assert_eq!(alice_replacement_instance.rfc724_mid, original_rfc724_mid);
|
||||
|
||||
// Bob receives WebXDC instance replacement.
|
||||
let bob_received_replacement_instance =
|
||||
bob.recv_msg(&alice_sent_replacement_instance).await;
|
||||
assert_eq!(
|
||||
bob_received_instance.id,
|
||||
bob_received_replacement_instance.id
|
||||
);
|
||||
assert_eq!(
|
||||
bob_received_replacement_instance.rfc724_mid,
|
||||
original_rfc724_mid
|
||||
);
|
||||
let bob_received_replacement_info = bob_received_replacement_instance
|
||||
.get_webxdc_info(&bob)
|
||||
.await?;
|
||||
assert_eq!(bob_received_replacement_info.name, "nice app!");
|
||||
|
||||
// Updates are not modified.
|
||||
assert_eq!(
|
||||
bob.get_webxdc_status_updates(
|
||||
bob_received_replacement_instance.id,
|
||||
StatusUpdateSerial(0)
|
||||
)
|
||||
.await?,
|
||||
r#"[{"payload":1,"serial":1,"max_serial":1}]"#
|
||||
);
|
||||
|
||||
// Bob is not allowed to replace the instance.
|
||||
assert!(send_webxdc_replacement(
|
||||
&bob,
|
||||
bob_received_instance.id,
|
||||
"test-data/webxdc/minimal.xdc"
|
||||
)
|
||||
.await
|
||||
.is_err());
|
||||
|
||||
// Alice sends a second WebXDC instance replacement.
|
||||
send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc")
|
||||
.await
|
||||
.context("Failed to send second WebXDC replacement")?;
|
||||
let alice_second_sent_replacement_instance = alice.pop_sent_msg().await;
|
||||
let bob_received_second_replacement_instance =
|
||||
bob.recv_msg(&alice_second_sent_replacement_instance).await;
|
||||
assert_eq!(
|
||||
bob_received_instance.id,
|
||||
bob_received_second_replacement_instance.id
|
||||
);
|
||||
assert_eq!(
|
||||
bob_received_second_replacement_instance.rfc724_mid,
|
||||
original_rfc724_mid
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_replace_webxdc_missing_original() -> Result<()> {
|
||||
let alice = TestContext::new_alice().await;
|
||||
let bob = TestContext::new_bob().await;
|
||||
|
||||
// Alice sends WebXDC instance.
|
||||
let alice_chat = alice.create_chat(&bob).await;
|
||||
let mut alice_instance = create_webxdc_instance(
|
||||
&alice,
|
||||
"minimal.xdc",
|
||||
include_bytes!("../test-data/webxdc/minimal.xdc"),
|
||||
)
|
||||
.await?;
|
||||
alice_instance.set_text("user added text".to_string());
|
||||
send_msg(&alice, alice_chat.id, &mut alice_instance).await?;
|
||||
alice.pop_sent_msg().await;
|
||||
let alice_instance = alice.get_last_msg().await;
|
||||
assert_eq!(alice_instance.get_text(), "user added text");
|
||||
let original_rfc724_mid = alice_instance.rfc724_mid;
|
||||
|
||||
// Bob missed the original instance message.
|
||||
|
||||
// Alice sends WebXDC instance replacement.
|
||||
send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc")
|
||||
.await
|
||||
.context("Failed to send WebXDC replacement")?;
|
||||
let alice_sent_replacement_instance = alice.pop_sent_msg().await;
|
||||
assert!(alice_sent_replacement_instance
|
||||
.payload
|
||||
.contains(&format!("Supersedes: {original_rfc724_mid}")));
|
||||
|
||||
// Bob receives WebXDC instance replacement.
|
||||
let bob_received_replacement_instance =
|
||||
bob.recv_msg(&alice_sent_replacement_instance).await;
|
||||
assert_eq!(
|
||||
bob_received_replacement_instance.rfc724_mid,
|
||||
original_rfc724_mid
|
||||
);
|
||||
|
||||
// Alice sends a second WebXDC instance replacement.
|
||||
send_webxdc_replacement(&alice, alice_instance.id, "test-data/webxdc/minimal.xdc")
|
||||
.await
|
||||
.context("Failed to send second WebXDC replacement")?;
|
||||
let alice_second_sent_replacement_instance = alice.pop_sent_msg().await;
|
||||
let bob_received_second_replacement_instance =
|
||||
bob.recv_msg(&alice_second_sent_replacement_instance).await;
|
||||
assert_eq!(
|
||||
bob_received_replacement_instance.id,
|
||||
bob_received_second_replacement_instance.id
|
||||
);
|
||||
assert_eq!(
|
||||
bob_received_second_replacement_instance.rfc724_mid,
|
||||
original_rfc724_mid
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user