mirror of
https://github.com/chatmail/core.git
synced 2026-04-22 16:06:30 +03:00
Merge tag 'v1.121.0'
This commit is contained in:
378
src/chat.rs
378
src/chat.rs
@@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::aheader::EncryptPreference;
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chatlist::Chatlist;
|
||||
use crate::color::str_to_color;
|
||||
use crate::config::Config;
|
||||
use crate::constants::{
|
||||
@@ -885,6 +886,133 @@ impl ChatId {
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Returns timestamp of the latest message in the chat.
|
||||
pub(crate) async fn get_timestamp(self, context: &Context) -> Result<Option<i64>> {
|
||||
let timestamp = context
|
||||
.sql
|
||||
.query_get_value("SELECT MAX(timestamp) FROM msgs WHERE chat_id=?", (self,))
|
||||
.await?;
|
||||
Ok(timestamp)
|
||||
}
|
||||
|
||||
/// Returns a list of active similar chat IDs sorted by similarity metric.
|
||||
///
|
||||
/// Jaccard similarity coefficient is used to estimate similarity of chat member sets.
|
||||
///
|
||||
/// Chat is considered active if something was posted there within the last 42 days.
|
||||
pub async fn get_similar_chat_ids(self, context: &Context) -> Result<Vec<(ChatId, f64)>> {
|
||||
// Count number of common members in this and other chats.
|
||||
let intersection: Vec<(ChatId, f64)> = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT y.chat_id, SUM(x.contact_id = y.contact_id)
|
||||
FROM chats_contacts as x
|
||||
JOIN chats_contacts as y
|
||||
WHERE x.contact_id > 9
|
||||
AND y.contact_id > 9
|
||||
AND x.chat_id=?
|
||||
AND y.chat_id<>x.chat_id
|
||||
GROUP BY y.chat_id",
|
||||
(self,),
|
||||
|row| {
|
||||
let chat_id: ChatId = row.get(0)?;
|
||||
let intersection: f64 = row.get(1)?;
|
||||
Ok((chat_id, intersection))
|
||||
},
|
||||
|rows| {
|
||||
rows.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("failed to calculate member set intersections")?;
|
||||
|
||||
let chat_size: HashMap<ChatId, f64> = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT chat_id, count(*) AS n
|
||||
FROM chats_contacts where contact_id > 9
|
||||
GROUP BY chat_id",
|
||||
(),
|
||||
|row| {
|
||||
let chat_id: ChatId = row.get(0)?;
|
||||
let size: f64 = row.get(1)?;
|
||||
Ok((chat_id, size))
|
||||
},
|
||||
|rows| {
|
||||
rows.collect::<std::result::Result<HashMap<ChatId, f64>, _>>()
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("failed to count chat member sizes")?;
|
||||
|
||||
let our_chat_size = chat_size.get(&self).copied().unwrap_or_default();
|
||||
let mut chats_with_metrics = Vec::new();
|
||||
for (chat_id, intersection_size) in intersection {
|
||||
if intersection_size > 0.0 {
|
||||
let other_chat_size = chat_size.get(&chat_id).copied().unwrap_or_default();
|
||||
let union_size = our_chat_size + other_chat_size - intersection_size;
|
||||
let metric = intersection_size / union_size;
|
||||
chats_with_metrics.push((chat_id, metric))
|
||||
}
|
||||
}
|
||||
chats_with_metrics.sort_unstable_by(|(chat_id1, metric1), (chat_id2, metric2)| {
|
||||
metric2
|
||||
.partial_cmp(metric1)
|
||||
.unwrap_or(chat_id2.cmp(chat_id1))
|
||||
});
|
||||
|
||||
// Select up to five similar active chats.
|
||||
let mut res = Vec::new();
|
||||
let now = time();
|
||||
for (chat_id, metric) in chats_with_metrics {
|
||||
if let Some(chat_timestamp) = chat_id.get_timestamp(context).await? {
|
||||
if now > chat_timestamp + 42 * 24 * 3600 {
|
||||
// Chat was inactive for 42 days, skip.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if metric < 0.1 {
|
||||
// Chat is unrelated.
|
||||
break;
|
||||
}
|
||||
|
||||
let chat = Chat::load_from_db(context, chat_id).await?;
|
||||
if chat.typ != Chattype::Group {
|
||||
continue;
|
||||
}
|
||||
|
||||
match chat.visibility {
|
||||
ChatVisibility::Normal | ChatVisibility::Pinned => {}
|
||||
ChatVisibility::Archived => continue,
|
||||
}
|
||||
|
||||
res.push((chat_id, metric));
|
||||
if res.len() >= 5 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Returns similar chats as a [`Chatlist`].
|
||||
///
|
||||
/// [`Chatlist`]: crate::chatlist::Chatlist
|
||||
pub async fn get_similar_chatlist(self, context: &Context) -> Result<Chatlist> {
|
||||
let chat_ids: Vec<ChatId> = self
|
||||
.get_similar_chat_ids(context)
|
||||
.await
|
||||
.context("failed to get similar chat IDs")?
|
||||
.into_iter()
|
||||
.map(|(chat_id, _metric)| chat_id)
|
||||
.collect();
|
||||
let chatlist = Chatlist::from_chat_ids(context, &chat_ids).await?;
|
||||
Ok(chatlist)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_param(self, context: &Context) -> Result<Params> {
|
||||
let res: Option<String> = context
|
||||
.sql
|
||||
@@ -1612,6 +1740,11 @@ impl Chat {
|
||||
None
|
||||
};
|
||||
|
||||
msg.chat_id = self.id;
|
||||
msg.from_id = ContactId::SELF;
|
||||
msg.rfc724_mid = new_rfc724_mid;
|
||||
msg.timestamp_sort = timestamp;
|
||||
|
||||
// add message to the database
|
||||
if let Some(update_msg_id) = update_msg_id {
|
||||
context
|
||||
@@ -1625,11 +1758,11 @@ impl Chat {
|
||||
ephemeral_timestamp=?
|
||||
WHERE id=?;",
|
||||
params_slice![
|
||||
new_rfc724_mid,
|
||||
self.id,
|
||||
ContactId::SELF,
|
||||
msg.rfc724_mid,
|
||||
msg.chat_id,
|
||||
msg.from_id,
|
||||
to_id,
|
||||
timestamp,
|
||||
msg.timestamp_sort,
|
||||
msg.viewtype,
|
||||
msg.state,
|
||||
msg.text,
|
||||
@@ -1674,11 +1807,11 @@ impl Chat {
|
||||
ephemeral_timestamp)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1,?,?,?);",
|
||||
params_slice![
|
||||
new_rfc724_mid,
|
||||
self.id,
|
||||
ContactId::SELF,
|
||||
msg.rfc724_mid,
|
||||
msg.chat_id,
|
||||
msg.from_id,
|
||||
to_id,
|
||||
timestamp,
|
||||
msg.timestamp_sort,
|
||||
msg.viewtype,
|
||||
msg.state,
|
||||
msg.text,
|
||||
@@ -2135,6 +2268,8 @@ async fn prepare_msg_blob(context: &Context, msg: &mut Message) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
msg.try_calc_and_set_dimensions(context).await?;
|
||||
|
||||
info!(
|
||||
context,
|
||||
"Attaching \"{}\" for message type #{}.",
|
||||
@@ -2308,7 +2443,7 @@ async fn prepare_send_msg(
|
||||
);
|
||||
message::update_msg_state(context, msg.id, MessageState::OutPending).await?;
|
||||
}
|
||||
let row_id = create_send_msg_job(context, msg.id).await?;
|
||||
let row_id = create_send_msg_job(context, msg).await?;
|
||||
Ok(row_id)
|
||||
}
|
||||
|
||||
@@ -2318,13 +2453,10 @@ async fn prepare_send_msg(
|
||||
/// group with only self and no BCC-to-self configured.
|
||||
///
|
||||
/// The caller has to interrupt SMTP loop or otherwise process a new row.
|
||||
async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<i64>> {
|
||||
let mut msg = Message::load_from_db(context, msg_id).await?;
|
||||
msg.try_calc_and_set_dimensions(context)
|
||||
.await
|
||||
.context("failed to calculate media dimensions")?;
|
||||
|
||||
/* create message */
|
||||
pub(crate) async fn create_send_msg_job(
|
||||
context: &Context,
|
||||
msg: &mut Message,
|
||||
) -> Result<Option<i64>> {
|
||||
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
|
||||
|
||||
let attach_selfavatar = match shall_attach_selfavatar(context, msg.chat_id).await {
|
||||
@@ -2335,7 +2467,7 @@ async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<
|
||||
}
|
||||
};
|
||||
|
||||
let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?;
|
||||
let mimefactory = MimeFactory::from_msg(context, msg, attach_selfavatar).await?;
|
||||
|
||||
let mut recipients = mimefactory.recipients();
|
||||
|
||||
@@ -2357,16 +2489,17 @@ async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<
|
||||
// may happen eg. for groups with only SELF and bcc_self disabled
|
||||
info!(
|
||||
context,
|
||||
"Message {msg_id} has no recipient, skipping smtp-send."
|
||||
"Message {} has no recipient, skipping smtp-send.", msg.id
|
||||
);
|
||||
msg_id.set_delivered(context).await?;
|
||||
msg.id.set_delivered(context).await?;
|
||||
msg.state = MessageState::OutDelivered;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let rendered_msg = match mimefactory.render(context).await {
|
||||
Ok(res) => Ok(res),
|
||||
Err(err) => {
|
||||
message::set_msg_failed(context, msg_id, &err.to_string()).await;
|
||||
message::set_msg_failed(context, msg, &err.to_string()).await?;
|
||||
Err(err)
|
||||
}
|
||||
}?;
|
||||
@@ -2375,13 +2508,13 @@ async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<
|
||||
/* unrecoverable */
|
||||
message::set_msg_failed(
|
||||
context,
|
||||
msg_id,
|
||||
msg,
|
||||
"End-to-end-encryption unavailable unexpectedly.",
|
||||
)
|
||||
.await;
|
||||
.await?;
|
||||
bail!(
|
||||
"e2e encryption unavailable {} - {:?}",
|
||||
msg_id,
|
||||
msg.id,
|
||||
needs_encryption
|
||||
);
|
||||
}
|
||||
@@ -2436,7 +2569,7 @@ async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<
|
||||
&rendered_msg.rfc724_mid,
|
||||
recipients,
|
||||
&rendered_msg.message,
|
||||
msg_id,
|
||||
msg.id,
|
||||
),
|
||||
)
|
||||
.await?;
|
||||
@@ -2629,14 +2762,7 @@ pub(crate) async fn marknoticed_chat_if_older_than(
|
||||
chat_id: ChatId,
|
||||
timestamp: i64,
|
||||
) -> Result<()> {
|
||||
if let Some(chat_timestamp) = context
|
||||
.sql
|
||||
.query_get_value(
|
||||
"SELECT MAX(timestamp) FROM msgs WHERE chat_id=?",
|
||||
(chat_id,),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
if let Some(chat_timestamp) = chat_id.get_timestamp(context).await? {
|
||||
if timestamp > chat_timestamp {
|
||||
marknoticed_chat(context, chat_id).await?;
|
||||
}
|
||||
@@ -3422,89 +3548,86 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
|
||||
chat_id
|
||||
.unarchive_if_not_muted(context, MessageState::Undefined)
|
||||
.await?;
|
||||
if let Ok(mut chat) = Chat::load_from_db(context, chat_id).await {
|
||||
if let Some(reason) = chat.why_cant_send(context).await? {
|
||||
bail!("cannot send to {}: {}", chat_id, reason);
|
||||
let mut chat = Chat::load_from_db(context, chat_id).await?;
|
||||
if let Some(reason) = chat.why_cant_send(context).await? {
|
||||
bail!("cannot send to {}: {}", chat_id, reason);
|
||||
}
|
||||
curr_timestamp = create_smeared_timestamps(context, msg_ids.len());
|
||||
let ids = context
|
||||
.sql
|
||||
.query_map(
|
||||
&format!(
|
||||
"SELECT id FROM msgs WHERE id IN({}) ORDER BY timestamp,id",
|
||||
sql::repeat_vars(msg_ids.len())
|
||||
),
|
||||
rusqlite::params_from_iter(msg_ids),
|
||||
|row| row.get::<_, MsgId>(0),
|
||||
|ids| ids.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|
||||
)
|
||||
.await?;
|
||||
|
||||
for id in ids {
|
||||
let src_msg_id: MsgId = id;
|
||||
let mut msg = Message::load_from_db(context, src_msg_id).await?;
|
||||
if msg.state == MessageState::OutDraft {
|
||||
bail!("cannot forward drafts.");
|
||||
}
|
||||
curr_timestamp = create_smeared_timestamps(context, msg_ids.len());
|
||||
let ids = context
|
||||
.sql
|
||||
.query_map(
|
||||
&format!(
|
||||
"SELECT id FROM msgs WHERE id IN({}) ORDER BY timestamp,id",
|
||||
sql::repeat_vars(msg_ids.len())
|
||||
),
|
||||
rusqlite::params_from_iter(msg_ids),
|
||||
|row| row.get::<_, MsgId>(0),
|
||||
|ids| ids.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|
||||
)
|
||||
.await?;
|
||||
|
||||
for id in ids {
|
||||
let src_msg_id: MsgId = id;
|
||||
let mut msg = Message::load_from_db(context, src_msg_id).await?;
|
||||
if msg.state == MessageState::OutDraft {
|
||||
bail!("cannot forward drafts.");
|
||||
}
|
||||
let original_param = msg.param.clone();
|
||||
|
||||
let original_param = msg.param.clone();
|
||||
// we tested a sort of broadcast
|
||||
// by not marking own forwarded messages as such,
|
||||
// however, this turned out to be to confusing and unclear.
|
||||
|
||||
// we tested a sort of broadcast
|
||||
// by not marking own forwarded messages as such,
|
||||
// however, this turned out to be to confusing and unclear.
|
||||
if msg.get_viewtype() != Viewtype::Sticker {
|
||||
msg.param
|
||||
.set_int(Param::Forwarded, src_msg_id.to_u32() as i32);
|
||||
}
|
||||
|
||||
if msg.get_viewtype() != Viewtype::Sticker {
|
||||
msg.param
|
||||
.set_int(Param::Forwarded, src_msg_id.to_u32() as i32);
|
||||
}
|
||||
msg.param.remove(Param::GuaranteeE2ee);
|
||||
msg.param.remove(Param::ForcePlaintext);
|
||||
msg.param.remove(Param::Cmd);
|
||||
msg.param.remove(Param::OverrideSenderDisplayname);
|
||||
msg.param.remove(Param::WebxdcSummary);
|
||||
msg.param.remove(Param::WebxdcSummaryTimestamp);
|
||||
msg.in_reply_to = None;
|
||||
|
||||
msg.param.remove(Param::GuaranteeE2ee);
|
||||
msg.param.remove(Param::ForcePlaintext);
|
||||
msg.param.remove(Param::Cmd);
|
||||
msg.param.remove(Param::OverrideSenderDisplayname);
|
||||
msg.param.remove(Param::WebxdcSummary);
|
||||
msg.param.remove(Param::WebxdcSummaryTimestamp);
|
||||
msg.in_reply_to = None;
|
||||
// do not leak data as group names; a default subject is generated by mimefactory
|
||||
msg.subject = "".to_string();
|
||||
|
||||
// do not leak data as group names; a default subject is generated by mimefactory
|
||||
msg.subject = "".to_string();
|
||||
let new_msg_id: MsgId;
|
||||
if msg.state == MessageState::OutPreparing {
|
||||
new_msg_id = chat
|
||||
.prepare_msg_raw(context, &mut msg, None, curr_timestamp)
|
||||
.await?;
|
||||
curr_timestamp += 1;
|
||||
msg.param = original_param;
|
||||
msg.id = src_msg_id;
|
||||
|
||||
let new_msg_id: MsgId;
|
||||
if msg.state == MessageState::OutPreparing {
|
||||
new_msg_id = chat
|
||||
.prepare_msg_raw(context, &mut msg, None, curr_timestamp)
|
||||
.await?;
|
||||
curr_timestamp += 1;
|
||||
let save_param = msg.param.clone();
|
||||
msg.param = original_param;
|
||||
msg.id = src_msg_id;
|
||||
|
||||
if let Some(old_fwd) = msg.param.get(Param::PrepForwards) {
|
||||
let new_fwd = format!("{} {}", old_fwd, new_msg_id.to_u32());
|
||||
msg.param.set(Param::PrepForwards, new_fwd);
|
||||
} else {
|
||||
msg.param
|
||||
.set(Param::PrepForwards, new_msg_id.to_u32().to_string());
|
||||
}
|
||||
|
||||
msg.update_param(context).await?;
|
||||
msg.param = save_param;
|
||||
if let Some(old_fwd) = msg.param.get(Param::PrepForwards) {
|
||||
let new_fwd = format!("{} {}", old_fwd, new_msg_id.to_u32());
|
||||
msg.param.set(Param::PrepForwards, new_fwd);
|
||||
} else {
|
||||
msg.state = MessageState::OutPending;
|
||||
new_msg_id = chat
|
||||
.prepare_msg_raw(context, &mut msg, None, curr_timestamp)
|
||||
.await?;
|
||||
curr_timestamp += 1;
|
||||
if create_send_msg_job(context, new_msg_id).await?.is_some() {
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
msg.param
|
||||
.set(Param::PrepForwards, new_msg_id.to_u32().to_string());
|
||||
}
|
||||
|
||||
msg.update_param(context).await?;
|
||||
} else {
|
||||
msg.state = MessageState::OutPending;
|
||||
new_msg_id = chat
|
||||
.prepare_msg_raw(context, &mut msg, None, curr_timestamp)
|
||||
.await?;
|
||||
curr_timestamp += 1;
|
||||
if create_send_msg_job(context, &mut msg).await?.is_some() {
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
created_chats.push(chat_id);
|
||||
created_msgs.push(new_msg_id);
|
||||
}
|
||||
created_chats.push(chat_id);
|
||||
created_msgs.push(new_msg_id);
|
||||
}
|
||||
for (chat_id, msg_id) in created_chats.iter().zip(created_msgs.iter()) {
|
||||
context.emit_msgs_changed(*chat_id, *msg_id);
|
||||
@@ -3536,29 +3659,31 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> {
|
||||
msgs.push(msg)
|
||||
}
|
||||
|
||||
if let Some(chat_id) = chat_id {
|
||||
let chat = Chat::load_from_db(context, chat_id).await?;
|
||||
for mut msg in msgs {
|
||||
if msg.get_showpadlock() && !chat.is_protected() {
|
||||
msg.param.remove(Param::GuaranteeE2ee);
|
||||
msg.update_param(context).await?;
|
||||
}
|
||||
match msg.get_state() {
|
||||
MessageState::OutFailed | MessageState::OutDelivered | MessageState::OutMdnRcvd => {
|
||||
message::update_msg_state(context, msg.id, MessageState::OutPending).await?
|
||||
}
|
||||
_ => bail!("unexpected message state"),
|
||||
}
|
||||
context.emit_event(EventType::MsgsChanged {
|
||||
chat_id: msg.chat_id,
|
||||
msg_id: msg.id,
|
||||
});
|
||||
if create_send_msg_job(context, msg.id).await?.is_some() {
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
let Some(chat_id) = chat_id else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let chat = Chat::load_from_db(context, chat_id).await?;
|
||||
for mut msg in msgs {
|
||||
if msg.get_showpadlock() && !chat.is_protected() {
|
||||
msg.param.remove(Param::GuaranteeE2ee);
|
||||
msg.update_param(context).await?;
|
||||
}
|
||||
match msg.get_state() {
|
||||
MessageState::OutFailed | MessageState::OutDelivered | MessageState::OutMdnRcvd => {
|
||||
message::update_msg_state(context, msg.id, MessageState::OutPending).await?
|
||||
}
|
||||
_ => bail!("unexpected message state"),
|
||||
}
|
||||
context.emit_event(EventType::MsgsChanged {
|
||||
chat_id: msg.chat_id,
|
||||
msg_id: msg.id,
|
||||
});
|
||||
if create_send_msg_job(context, &mut msg).await?.is_some() {
|
||||
context
|
||||
.scheduler
|
||||
.interrupt_smtp(InterruptInfo::new(false))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -3628,7 +3753,6 @@ pub async fn add_device_msg_with_importance(
|
||||
chat_id = ChatId::get_for_contact(context, ContactId::DEVICE).await?;
|
||||
|
||||
let rfc724_mid = create_outgoing_rfc724_mid(None, "@device");
|
||||
msg.try_calc_and_set_dimensions(context).await.ok();
|
||||
prepare_msg_blob(context, msg).await?;
|
||||
|
||||
let timestamp_sent = create_smeared_timestamp(context);
|
||||
|
||||
Reference in New Issue
Block a user