mirror of
https://github.com/chatmail/core.git
synced 2026-05-09 09:56:31 +03:00
Compare commits
7 Commits
iequidoo/r
...
hpk/per-re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f1c02d89f | ||
|
|
5e7fcdac8f | ||
|
|
ca70fb9b3a | ||
|
|
045b586569 | ||
|
|
18e1ecbb94 | ||
|
|
6fdee2b92d | ||
|
|
9ebd4769f5 |
9
Cargo.lock
generated
9
Cargo.lock
generated
@@ -3941,15 +3941,14 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
|
||||
|
||||
[[package]]
|
||||
name = "openssl"
|
||||
version = "0.10.78"
|
||||
version = "0.10.79"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f38c4372413cdaaf3cc79dd92d29d7d9f5ab09b51b10dded508fb90bb70b9222"
|
||||
checksum = "bf0b434746ee2832f4f0baf10137e1cabb18cbe6912c69e2e33263c45250f542"
|
||||
dependencies = [
|
||||
"bitflags 2.11.0",
|
||||
"cfg-if",
|
||||
"foreign-types",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"openssl-macros",
|
||||
"openssl-sys",
|
||||
]
|
||||
@@ -3982,9 +3981,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openssl-sys"
|
||||
version = "0.9.114"
|
||||
version = "0.9.115"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "13ce1245cd07fcc4cfdb438f7507b0c7e4f3849a69fd84d52374c66d83741bb6"
|
||||
checksum = "158fe5b292746440aa6e7a7e690e55aeb72d41505e2804c23c6973ad0e9c9781"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
|
||||
@@ -4003,8 +4003,6 @@ int dc_msg_get_viewtype (const dc_msg_t* msg);
|
||||
* Marked as read on IMAP and MDN may be sent. Use dc_markseen_msgs() to mark messages as being seen.
|
||||
*
|
||||
* Outgoing message states:
|
||||
* - @ref DC_STATE_OUT_PREPARING - For files which need time to be prepared before they can be sent,
|
||||
* the message enters this state before @ref DC_STATE_OUT_PENDING. Deprecated.
|
||||
* - @ref DC_STATE_OUT_DRAFT - Message saved as draft using dc_set_draft()
|
||||
* - @ref DC_STATE_OUT_PENDING - The user has pressed the "send" button but the
|
||||
* message is not yet sent and is pending in some way. Maybe we're offline (no checkmark).
|
||||
@@ -5589,13 +5587,6 @@ int64_t dc_lot_get_timestamp (const dc_lot_t* lot);
|
||||
*/
|
||||
#define DC_STATE_IN_SEEN 16
|
||||
|
||||
/**
|
||||
* Outgoing message being prepared. See dc_msg_get_state() for details.
|
||||
*
|
||||
* @deprecated 2024-12-07
|
||||
*/
|
||||
#define DC_STATE_OUT_PREPARING 18
|
||||
|
||||
/**
|
||||
* Outgoing message drafted. See dc_msg_get_state() for details.
|
||||
*/
|
||||
|
||||
@@ -230,7 +230,6 @@ pub enum LotState {
|
||||
MsgInFresh = 10,
|
||||
MsgInNoticed = 13,
|
||||
MsgInSeen = 16,
|
||||
MsgOutPreparing = 18,
|
||||
MsgOutDraft = 19,
|
||||
MsgOutPending = 20,
|
||||
MsgOutFailed = 24,
|
||||
@@ -246,7 +245,6 @@ impl From<MessageState> for LotState {
|
||||
InFresh => LotState::MsgInFresh,
|
||||
InNoticed => LotState::MsgInNoticed,
|
||||
InSeen => LotState::MsgInSeen,
|
||||
OutPreparing => LotState::MsgOutPreparing,
|
||||
OutDraft => LotState::MsgOutDraft,
|
||||
OutPending => LotState::MsgOutPending,
|
||||
OutFailed => LotState::MsgOutFailed,
|
||||
|
||||
@@ -66,7 +66,7 @@ impl WebxdcMessageInfo {
|
||||
self_addr,
|
||||
is_app_sender,
|
||||
is_broadcast,
|
||||
send_update_interval_ms,
|
||||
send_update_interval,
|
||||
send_update_max_size,
|
||||
} = message.get_webxdc_info(context).await?;
|
||||
|
||||
@@ -80,7 +80,7 @@ impl WebxdcMessageInfo {
|
||||
self_addr,
|
||||
is_app_sender,
|
||||
is_broadcast,
|
||||
send_update_interval: send_update_interval_ms,
|
||||
send_update_interval,
|
||||
send_update_max_size,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -91,9 +91,9 @@ impl Ratelimit {
|
||||
self.until_can_send_at(SystemTime::now())
|
||||
}
|
||||
|
||||
/// Returns the minimum possible sending interval.
|
||||
pub fn min_send_interval(&self) -> Duration {
|
||||
self.window.div_f64(self.quota)
|
||||
/// Returns minimum possible update interval in milliseconds.
|
||||
pub fn update_interval(&self) -> usize {
|
||||
(self.window.as_millis() as f64 / self.quota) as usize
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ mod tests {
|
||||
|
||||
let mut ratelimit = Ratelimit::new_at(Duration::new(60, 0), 3.0, now);
|
||||
assert!(ratelimit.can_send_at(now));
|
||||
assert_eq!(ratelimit.min_send_interval(), Duration::new(20, 0));
|
||||
assert_eq!(ratelimit.update_interval(), 20_000);
|
||||
|
||||
// Send burst of 3 messages.
|
||||
ratelimit.send_at(now);
|
||||
|
||||
@@ -190,7 +190,6 @@ class MessageState(IntEnum):
|
||||
IN_FRESH = 10
|
||||
IN_NOTICED = 13
|
||||
IN_SEEN = 16
|
||||
OUT_PREPARING = 18
|
||||
OUT_DRAFT = 19
|
||||
OUT_PENDING = 20
|
||||
OUT_FAILED = 24
|
||||
|
||||
@@ -271,15 +271,6 @@ class Chat:
|
||||
sent out. This is the same object as was passed in, which
|
||||
has been modified with the new state of the core.
|
||||
"""
|
||||
if msg.is_out_preparing():
|
||||
assert msg.id != 0
|
||||
# get a fresh copy of dc_msg, the core needs it
|
||||
maybe_msg = Message.from_db(self.account, msg.id)
|
||||
if maybe_msg is not None:
|
||||
msg = maybe_msg
|
||||
else:
|
||||
raise ValueError("message does not exist")
|
||||
|
||||
sent_id = lib.dc_send_msg(self.account._dc_context, self.id, msg._dc_msg)
|
||||
if sent_id == 0:
|
||||
raise ValueError("message could not be sent")
|
||||
@@ -333,26 +324,6 @@ class Chat:
|
||||
raise ValueError("message could not be sent")
|
||||
return Message.from_db(self.account, sent_id)
|
||||
|
||||
def send_prepared(self, message):
|
||||
"""send a previously prepared message.
|
||||
|
||||
:param message: a :class:`Message` instance previously returned by
|
||||
:meth:`prepare_file`.
|
||||
:raises ValueError: if message can not be sent.
|
||||
:returns: a :class:`deltachat.message.Message` instance as sent out.
|
||||
"""
|
||||
assert message.id != 0 and message.is_out_preparing()
|
||||
# get a fresh copy of dc_msg, the core needs it
|
||||
msg = Message.from_db(self.account, message.id)
|
||||
|
||||
# pass 0 as chat-id because core-docs say it's ok when out-preparing
|
||||
sent_id = lib.dc_send_msg(self.account._dc_context, 0, msg._dc_msg)
|
||||
if sent_id == 0:
|
||||
raise ValueError("message could not be sent")
|
||||
assert sent_id == msg.id
|
||||
# modify message in place to avoid bad state for the caller
|
||||
msg._dc_msg = Message.from_db(self.account, sent_id)._dc_msg
|
||||
|
||||
def set_draft(self, message):
|
||||
"""set message as draft.
|
||||
|
||||
|
||||
@@ -351,17 +351,12 @@ class Message:
|
||||
def is_outgoing(self):
|
||||
"""Return True if Message is outgoing."""
|
||||
return lib.dc_msg_get_state(self._dc_msg) in (
|
||||
const.DC_STATE_OUT_PREPARING,
|
||||
const.DC_STATE_OUT_PENDING,
|
||||
const.DC_STATE_OUT_FAILED,
|
||||
const.DC_STATE_OUT_MDN_RCVD,
|
||||
const.DC_STATE_OUT_DELIVERED,
|
||||
)
|
||||
|
||||
def is_out_preparing(self):
|
||||
"""Return True if Message is outgoing, but its file is being prepared."""
|
||||
return self._msgstate == const.DC_STATE_OUT_PREPARING
|
||||
|
||||
def is_out_pending(self):
|
||||
"""Return True if Message is outgoing, but is pending (no single checkmark)."""
|
||||
return self._msgstate == const.DC_STATE_OUT_PENDING
|
||||
|
||||
21
src/chat.rs
21
src/chat.rs
@@ -2613,7 +2613,7 @@ pub async fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) ->
|
||||
"chat_id cannot be a special chat: {chat_id}"
|
||||
);
|
||||
|
||||
if msg.state != MessageState::Undefined && msg.state != MessageState::OutPreparing {
|
||||
if msg.state != MessageState::Undefined {
|
||||
msg.param.remove(Param::GuaranteeE2ee);
|
||||
msg.param.remove(Param::ForcePlaintext);
|
||||
// create_send_msg_jobs() will update `param` in the db.
|
||||
@@ -2721,10 +2721,7 @@ async fn prepare_send_msg(
|
||||
None
|
||||
};
|
||||
|
||||
if matches!(
|
||||
msg.state,
|
||||
MessageState::Undefined | MessageState::OutPreparing
|
||||
)
|
||||
if msg.state == MessageState::Undefined
|
||||
// Legacy SecureJoin "v*-request" messages are unencrypted.
|
||||
&& msg.param.get_cmd() != SystemMessage::SecurejoinMessage
|
||||
&& chat.is_encrypted(context).await?
|
||||
@@ -2937,8 +2934,8 @@ pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -
|
||||
UPDATE msgs SET
|
||||
timestamp=(
|
||||
SELECT MAX(timestamp) FROM msgs INDEXED BY msgs_index7 WHERE
|
||||
-- From `InFresh` to `OutMdnRcvd` inclusive except `OutDraft`.
|
||||
state IN(10,13,16,18,20,24,26,28) AND
|
||||
-- From `InFresh` to `OutDelivered` inclusive, except `OutDraft`.
|
||||
state IN(10,13,16,18,20,24,26) AND
|
||||
hidden IN(0,1) AND
|
||||
chat_id=? AND
|
||||
id<=?
|
||||
@@ -2957,7 +2954,6 @@ WHERE id=?
|
||||
)
|
||||
.await?;
|
||||
|
||||
let chunk_size = context.get_max_smtp_rcpt_to().await?;
|
||||
let trans_fn = |t: &mut rusqlite::Transaction| {
|
||||
let mut row_ids = Vec::<i64>::new();
|
||||
|
||||
@@ -2971,12 +2967,12 @@ WHERE id=?
|
||||
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
)?;
|
||||
for recipients_chunk in recipients.chunks(chunk_size) {
|
||||
let recipients_chunk = recipients_chunk.join(" ");
|
||||
if !recipients.is_empty() {
|
||||
let all_recipients = recipients.join(" ");
|
||||
if let Some(pre_msg) = &rendered_pre_msg {
|
||||
let row_id = stmt.execute((
|
||||
&pre_msg.rfc724_mid,
|
||||
&recipients_chunk,
|
||||
&all_recipients,
|
||||
&pre_msg.message,
|
||||
msg.id,
|
||||
))?;
|
||||
@@ -2984,7 +2980,7 @@ WHERE id=?
|
||||
}
|
||||
let row_id = stmt.execute((
|
||||
&rendered_msg.rfc724_mid,
|
||||
&recipients_chunk,
|
||||
&all_recipients,
|
||||
&rendered_msg.message,
|
||||
msg.id,
|
||||
))?;
|
||||
@@ -4539,6 +4535,7 @@ pub async fn forward_msgs_2ctx(
|
||||
|
||||
msg.state = MessageState::OutPending;
|
||||
msg.rfc724_mid = create_outgoing_rfc724_mid();
|
||||
msg.pre_rfc724_mid.clear();
|
||||
msg.timestamp_sort = curr_timestamp;
|
||||
chat.prepare_msg_raw(ctx_dst, &mut msg, None).await?;
|
||||
|
||||
|
||||
@@ -204,9 +204,6 @@ pub const MAX_RCVD_IMAGE_PIXELS: u32 = 50_000_000;
|
||||
// `max_smtp_rcpt_to` in the provider db.
|
||||
pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50;
|
||||
|
||||
/// Same as `DEFAULT_MAX_SMTP_RCPT_TO`, but for chatmail relays.
|
||||
pub(crate) const DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO: usize = 999;
|
||||
|
||||
/// How far the last quota check needs to be in the past to be checked by the background function (in seconds).
|
||||
pub(crate) const DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT: u64 = 12 * 60 * 60; // 12 hours
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use tokio::sync::{Mutex, Notify, RwLock};
|
||||
|
||||
use crate::chat::{ChatId, get_chat_cnt};
|
||||
use crate::config::Config;
|
||||
use crate::constants::{self, DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
|
||||
use crate::constants::{DC_BACKGROUND_FETCH_QUOTA_CHECK_RATELIMIT, DC_VERSION_STR};
|
||||
use crate::contact::{Contact, ContactId};
|
||||
use crate::debug_logging::DebugLogging;
|
||||
use crate::events::{Event, EventEmitter, EventType, Events};
|
||||
@@ -587,23 +587,6 @@ impl Context {
|
||||
self.get_config_bool(Config::IsChatmail).await
|
||||
}
|
||||
|
||||
/// Returns maximum number of recipients the provider allows to send a single email to.
|
||||
pub(crate) async fn get_max_smtp_rcpt_to(&self) -> Result<usize> {
|
||||
let is_chatmail = self.is_chatmail().await?;
|
||||
let val = self
|
||||
.get_configured_provider()
|
||||
.await?
|
||||
.and_then(|provider| provider.opt.max_smtp_rcpt_to)
|
||||
.map_or_else(
|
||||
|| match is_chatmail {
|
||||
true => constants::DEFAULT_CHATMAIL_MAX_SMTP_RCPT_TO,
|
||||
false => constants::DEFAULT_MAX_SMTP_RCPT_TO,
|
||||
},
|
||||
usize::from,
|
||||
);
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
/// Does a single round of fetching from IMAP and returns.
|
||||
///
|
||||
/// Can be used even if I/O is currently stopped.
|
||||
|
||||
17
src/imap.rs
17
src/imap.rs
@@ -1503,7 +1503,7 @@ impl Session {
|
||||
.get_metadata(
|
||||
mailbox,
|
||||
options,
|
||||
"(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn)",
|
||||
"(/shared/comment /shared/admin /shared/vendor/deltachat/irohrelay /shared/vendor/deltachat/turn /shared/vendor/deltachat/maxsmtprecipients)",
|
||||
)
|
||||
.await?;
|
||||
for m in metadata {
|
||||
@@ -1539,6 +1539,21 @@ impl Session {
|
||||
}
|
||||
}
|
||||
}
|
||||
"/shared/vendor/deltachat/maxsmtprecipients" => {
|
||||
if let Some(value) = m.value.and_then(|v| v.parse::<u32>().ok()) {
|
||||
let transport_id = self.transport_id();
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE transports \
|
||||
SET max_smtp_rcpt_to=? WHERE id=?",
|
||||
(value, transport_id),
|
||||
)
|
||||
.await
|
||||
.log_err(context)
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1381,13 +1381,8 @@ pub enum MessageState {
|
||||
/// IMAP and MDN may be sent.
|
||||
InSeen = 16,
|
||||
|
||||
/// For files which need time to be prepared before they can be
|
||||
/// sent, the message enters this state before
|
||||
/// OutPending.
|
||||
///
|
||||
/// Deprecated 2024-12-07.
|
||||
OutPreparing = 18,
|
||||
|
||||
// Deprecated 2024-12-07. Removed 2026-04.
|
||||
// OutPreparing = 18,
|
||||
/// Message saved as draft.
|
||||
OutDraft = 19,
|
||||
|
||||
@@ -1420,7 +1415,6 @@ impl std::fmt::Display for MessageState {
|
||||
Self::InFresh => "Fresh",
|
||||
Self::InNoticed => "Noticed",
|
||||
Self::InSeen => "Seen",
|
||||
Self::OutPreparing => "Preparing",
|
||||
Self::OutDraft => "Draft",
|
||||
Self::OutPending => "Pending",
|
||||
Self::OutFailed => "Failed",
|
||||
@@ -1437,7 +1431,7 @@ impl MessageState {
|
||||
use MessageState::*;
|
||||
matches!(
|
||||
self,
|
||||
OutPreparing | OutPending | OutDelivered | OutMdnRcvd // OutMdnRcvd can still fail because it could be a group message and only some recipients failed.
|
||||
OutPending | OutDelivered | OutMdnRcvd // OutMdnRcvd can still fail because it could be a group message and only some recipients failed.
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1446,7 +1440,7 @@ impl MessageState {
|
||||
use MessageState::*;
|
||||
matches!(
|
||||
self,
|
||||
OutPreparing | OutDraft | OutPending | OutFailed | OutDelivered | OutMdnRcvd
|
||||
OutDraft | OutPending | OutFailed | OutDelivered | OutMdnRcvd
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -66,12 +66,6 @@ impl Context {
|
||||
|
||||
/// Updates `quota.recent`, sets `quota.modified` to the current time
|
||||
/// and emits an event to let the UIs update connectivity view.
|
||||
///
|
||||
/// Moreover, once each time quota gets larger than `QUOTA_WARN_THRESHOLD_PERCENTAGE`,
|
||||
/// a device message is added.
|
||||
/// As the message is added only once, the user is not spammed
|
||||
/// in case for some providers the quota is always at ~100%
|
||||
/// and new space is allocated as needed.
|
||||
pub(crate) async fn update_recent_quota(
|
||||
&self,
|
||||
session: &mut ImapSession,
|
||||
|
||||
@@ -806,8 +806,6 @@ UPDATE config SET value=? WHERE keyname='configured_addr' AND value!=?1
|
||||
if transport_changed {
|
||||
info!(context, "Primary transport changed to {from_addr:?}.");
|
||||
context.sql.uncache_raw_config("configured_addr").await;
|
||||
|
||||
// Regenerate User ID in V4 keys.
|
||||
context.self_public_key.lock().await.take();
|
||||
|
||||
context.emit_event(EventType::TransportsModified);
|
||||
|
||||
155
src/smtp.rs
155
src/smtp.rs
@@ -2,6 +2,8 @@
|
||||
|
||||
mod connect;
|
||||
pub mod send;
|
||||
#[cfg(test)]
|
||||
mod chunking_tests;
|
||||
|
||||
use anyhow::{Context as _, Error, Result, bail, format_err};
|
||||
use async_smtp::response::{Category, Code, Detail};
|
||||
@@ -10,6 +12,7 @@ use tokio::task;
|
||||
|
||||
use crate::chat::{ChatId, add_info_msg_with_cmd};
|
||||
use crate::config::Config;
|
||||
use crate::constants;
|
||||
use crate::contact::{Contact, ContactId};
|
||||
use crate::context::Context;
|
||||
use crate::events::EventType;
|
||||
@@ -34,6 +37,9 @@ pub(crate) struct Smtp {
|
||||
/// Email address we are sending from.
|
||||
from: Option<EmailAddress>,
|
||||
|
||||
/// Transport used for the current connection.
|
||||
transport_id: Option<u32>,
|
||||
|
||||
/// Timestamp of last successful send/receive network interaction
|
||||
/// (eg connect or send succeeded). On initialization and disconnect
|
||||
/// it is set to None.
|
||||
@@ -60,6 +66,7 @@ impl Smtp {
|
||||
task::spawn(async move { transport.quit().await });
|
||||
}
|
||||
self.last_success = None;
|
||||
self.transport_id = None;
|
||||
}
|
||||
|
||||
/// Return true if smtp was connected but is not known to
|
||||
@@ -89,9 +96,10 @@ impl Smtp {
|
||||
}
|
||||
|
||||
self.connectivity.set_connecting(context);
|
||||
let (_transport_id, lp) = ConfiguredLoginParam::load(context)
|
||||
let (transport_id, lp) = ConfiguredLoginParam::load(context)
|
||||
.await?
|
||||
.context("Not configured")?;
|
||||
self.transport_id = Some(transport_id);
|
||||
let proxy_config = ProxyConfig::load(context).await?;
|
||||
self.connect(
|
||||
context,
|
||||
@@ -165,6 +173,7 @@ impl Smtp {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum SendResult {
|
||||
/// Message was sent successfully.
|
||||
Success,
|
||||
@@ -176,13 +185,36 @@ pub(crate) enum SendResult {
|
||||
Retry,
|
||||
}
|
||||
|
||||
pub(crate) trait SmtpSender: Send {
|
||||
fn send_chunk<'a>(
|
||||
&'a mut self,
|
||||
context: &'a Context,
|
||||
recipients: &'a [async_smtp::EmailAddress],
|
||||
body: &'a str,
|
||||
) -> futures::future::BoxFuture<'a, SendResult>;
|
||||
}
|
||||
|
||||
struct RealSmtpSender<'a> {
|
||||
smtp: &'a mut Smtp,
|
||||
}
|
||||
|
||||
impl SmtpSender for RealSmtpSender<'_> {
|
||||
fn send_chunk<'a>(
|
||||
&'a mut self,
|
||||
context: &'a Context,
|
||||
recipients: &'a [async_smtp::EmailAddress],
|
||||
body: &'a str,
|
||||
) -> futures::future::BoxFuture<'a, SendResult> {
|
||||
Box::pin(smtp_send(context, recipients, body, self.smtp))
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to send a message.
|
||||
pub(crate) async fn smtp_send(
|
||||
context: &Context,
|
||||
recipients: &[async_smtp::EmailAddress],
|
||||
message: &str,
|
||||
smtp: &mut Smtp,
|
||||
msg_id: Option<MsgId>,
|
||||
) -> SendResult {
|
||||
if recipients.is_empty() {
|
||||
return SendResult::Success;
|
||||
@@ -310,25 +342,6 @@ pub(crate) async fn smtp_send(
|
||||
Ok(()) => SendResult::Success,
|
||||
};
|
||||
|
||||
if let SendResult::Failure(err) = &status
|
||||
&& let Some(msg_id) = msg_id
|
||||
{
|
||||
// We couldn't send the message, so mark it as failed
|
||||
match Message::load_from_db(context, msg_id).await {
|
||||
Ok(mut msg) => {
|
||||
if let Err(err) = message::set_msg_failed(context, &mut msg, &err.to_string()).await
|
||||
{
|
||||
error!(context, "Failed to mark {msg_id} as failed: {err:#}.");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
context,
|
||||
"Failed to load {msg_id} to mark it as failed: {err:#}."
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
status
|
||||
}
|
||||
|
||||
@@ -406,7 +419,40 @@ pub(crate) async fn send_msg_to_smtp(
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let status = smtp_send(context, &recipients_list, body.as_str(), smtp, Some(msg_id)).await;
|
||||
let transport_id = smtp
|
||||
.transport_id
|
||||
.context("SMTP not connected to a transport")?;
|
||||
let chunk_size = max_smtp_rcpt_to(context, transport_id).await?;
|
||||
|
||||
let mut sender = RealSmtpSender { smtp };
|
||||
let (status, start_idx) = send_smtp_chunks(
|
||||
context,
|
||||
&recipients_list,
|
||||
body.as_str(),
|
||||
chunk_size,
|
||||
&mut sender,
|
||||
)
|
||||
.await;
|
||||
|
||||
let unsent_saved = start_idx < recipients_list.len();
|
||||
if let Some(unsent) = recipients_list.get(start_idx..)
|
||||
&& !unsent.is_empty()
|
||||
{
|
||||
let unsent_str: String = unsent
|
||||
.iter()
|
||||
.map(|a| a.as_ref())
|
||||
.collect::<Vec<&str>>()
|
||||
.join(" ");
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE smtp SET recipients=? WHERE id=?",
|
||||
(unsent_str, rowid),
|
||||
)
|
||||
.await
|
||||
.log_err(context)
|
||||
.ok();
|
||||
}
|
||||
|
||||
match status {
|
||||
SendResult::Retry => {}
|
||||
@@ -455,10 +501,15 @@ pub(crate) async fn send_msg_to_smtp(
|
||||
.await?;
|
||||
};
|
||||
}
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
||||
.await?;
|
||||
if let Some(mut msg) = Message::load_from_db_optional(context, msg_id).await? {
|
||||
message::set_msg_failed(context, &mut msg, &err.to_string()).await?;
|
||||
}
|
||||
if !unsent_saved {
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp WHERE id=?", (rowid,))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -470,10 +521,39 @@ pub(crate) async fn send_msg_to_smtp(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
SendResult::Failure(err) => Err(format_err!("{err}")),
|
||||
SendResult::Failure(err) => {
|
||||
if unsent_saved {
|
||||
Err(format_err!("Retry"))
|
||||
} else {
|
||||
Err(format_err!("{err}"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn max_smtp_rcpt_to(context: &Context, transport_id: u32) -> Result<usize> {
|
||||
let limit = context
|
||||
.sql
|
||||
.query_row_optional(
|
||||
"SELECT max_smtp_rcpt_to FROM transports WHERE id=?",
|
||||
(transport_id,),
|
||||
|row| row.get::<_, u32>(0),
|
||||
)
|
||||
.await?
|
||||
.unwrap_or(0);
|
||||
|
||||
if limit > 0 {
|
||||
return Ok(limit as usize);
|
||||
}
|
||||
|
||||
let val = context
|
||||
.get_configured_provider()
|
||||
.await?
|
||||
.and_then(|provider| provider.opt.max_smtp_rcpt_to)
|
||||
.map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from);
|
||||
Ok(val)
|
||||
}
|
||||
|
||||
pub(crate) async fn msg_has_pending_smtp_job(
|
||||
context: &Context,
|
||||
msg_id: MsgId,
|
||||
@@ -600,7 +680,7 @@ async fn send_mdn_rfc724_mid(
|
||||
})
|
||||
.collect();
|
||||
|
||||
match smtp_send(context, &recipients, &body, smtp, None).await {
|
||||
match smtp_send(context, &recipients, &body, smtp).await {
|
||||
SendResult::Success => {
|
||||
if !recipients.is_empty() {
|
||||
info!(context, "Successfully sent MDN for {rfc724_mid}.");
|
||||
@@ -722,3 +802,22 @@ pub(crate) async fn add_self_recipients(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::arithmetic_side_effects)]
|
||||
pub(crate) async fn send_smtp_chunks(
|
||||
context: &Context,
|
||||
recipients: &[async_smtp::EmailAddress],
|
||||
body: &str,
|
||||
chunk_size: usize,
|
||||
sender: &mut (dyn SmtpSender + Send),
|
||||
) -> (SendResult, usize) {
|
||||
for (i, chunk) in recipients.chunks(chunk_size).enumerate() {
|
||||
let status = sender.send_chunk(context, chunk, body).await;
|
||||
match status {
|
||||
SendResult::Success => continue,
|
||||
SendResult::Failure(_) => return (status, (i + 1) * chunk_size),
|
||||
SendResult::Retry => return (status, i * chunk_size),
|
||||
}
|
||||
}
|
||||
(SendResult::Success, recipients.len())
|
||||
}
|
||||
|
||||
102
src/smtp/chunking_tests.rs
Normal file
102
src/smtp/chunking_tests.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use crate::smtp::{send_smtp_chunks, SendResult, SmtpSender};
|
||||
use crate::test_utils::TestContextManager;
|
||||
use crate::context::Context;
|
||||
use anyhow::Result;
|
||||
use futures::future::{BoxFuture, FutureExt};
|
||||
|
||||
/// Result the mock should return on the designated call.
|
||||
enum MockFailure {
|
||||
Transient,
|
||||
Permanent,
|
||||
}
|
||||
|
||||
struct MockSmtpSender {
|
||||
call_count: usize,
|
||||
fail_on_call: Option<(usize, MockFailure)>,
|
||||
}
|
||||
|
||||
impl SmtpSender for MockSmtpSender {
|
||||
fn send_chunk<'a>(
|
||||
&'a mut self,
|
||||
_context: &'a Context,
|
||||
_recipients: &'a [async_smtp::EmailAddress],
|
||||
_body: &'a str,
|
||||
) -> BoxFuture<'a, SendResult> {
|
||||
self.call_count += 1;
|
||||
let count = self.call_count;
|
||||
let fail_on = self.fail_on_call.as_ref().map(|(n, _)| *n);
|
||||
let is_permanent = matches!(
|
||||
self.fail_on_call,
|
||||
Some((_, MockFailure::Permanent))
|
||||
);
|
||||
async move {
|
||||
if fail_on == Some(count) {
|
||||
if is_permanent {
|
||||
return SendResult::Failure(
|
||||
anyhow::format_err!("permanent error"),
|
||||
);
|
||||
}
|
||||
return SendResult::Retry;
|
||||
}
|
||||
SendResult::Success
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_send_smtp_chunks() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let alice = tcm.alice().await;
|
||||
|
||||
let recipients: Vec<_> = ["r1@ex.org", "r2@ex.org", "r3@ex.org", "r4@ex.org", "r5@ex.org"]
|
||||
.iter()
|
||||
.map(|a| async_smtp::EmailAddress::new(a.to_string()).unwrap())
|
||||
.collect();
|
||||
|
||||
// All chunks succeed.
|
||||
let mut sender = MockSmtpSender { call_count: 0, fail_on_call: None };
|
||||
let (status, processed) =
|
||||
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||
assert!(matches!(status, SendResult::Success));
|
||||
assert_eq!(processed, 5);
|
||||
assert_eq!(sender.call_count, 3); // chunks: [2, 2, 1]
|
||||
|
||||
// Second chunk gets a transient error, only first chunk's recipients are processed.
|
||||
let mut sender =
|
||||
MockSmtpSender { call_count: 0, fail_on_call: Some((2, MockFailure::Transient)) };
|
||||
let (status, processed) =
|
||||
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||
assert!(matches!(status, SendResult::Retry));
|
||||
assert_eq!(processed, 2);
|
||||
assert_eq!(sender.call_count, 2);
|
||||
|
||||
// Last chunk gets a transient error, first two chunks' recipients are processed.
|
||||
let mut sender =
|
||||
MockSmtpSender { call_count: 0, fail_on_call: Some((3, MockFailure::Transient)) };
|
||||
let (status, processed) =
|
||||
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||
assert!(matches!(status, SendResult::Retry));
|
||||
assert_eq!(processed, 4);
|
||||
assert_eq!(sender.call_count, 3);
|
||||
|
||||
// Second chunk gets a permanent error; processed includes the failed chunk.
|
||||
let mut sender =
|
||||
MockSmtpSender { call_count: 0, fail_on_call: Some((2, MockFailure::Permanent)) };
|
||||
let (status, processed) =
|
||||
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||
assert!(matches!(status, SendResult::Failure(_)));
|
||||
assert_eq!(processed, 4);
|
||||
assert_eq!(sender.call_count, 2);
|
||||
|
||||
// Last chunk gets a permanent error; processed includes the failed chunk.
|
||||
let mut sender =
|
||||
MockSmtpSender { call_count: 0, fail_on_call: Some((3, MockFailure::Permanent)) };
|
||||
let (status, processed) =
|
||||
send_smtp_chunks(&alice.ctx, &recipients, "body", 2, &mut sender).await;
|
||||
assert!(matches!(status, SendResult::Failure(_)));
|
||||
assert_eq!(processed, 6); // capped at (i+1)*chunk_size, may exceed len
|
||||
assert_eq!(sender.call_count, 3);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2373,6 +2373,27 @@ ALTER TABLE contacts ADD COLUMN name_normalized TEXT;
|
||||
.await?;
|
||||
}
|
||||
|
||||
inc_and_check(&mut migration_version, 152)?;
|
||||
if dbversion < migration_version {
|
||||
sql.execute_migration(
|
||||
"
|
||||
UPDATE msgs SET state=26 WHERE state=28; -- Change OutMdnRcvd to OutDelivered.
|
||||
UPDATE msgs SET state=19 WHERE state=24; -- Change OutPreparing to OutFailed.
|
||||
",
|
||||
migration_version,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
inc_and_check(&mut migration_version, 153)?;
|
||||
if dbversion < migration_version {
|
||||
sql.execute_migration(
|
||||
"ALTER TABLE transports ADD COLUMN max_smtp_rcpt_to INTEGER NOT NULL DEFAULT 0",
|
||||
migration_version,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let new_version = sql
|
||||
.get_raw_config_int(VERSION_CFG)
|
||||
.await?
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
//! Tests about forwarding and saving Pre-Messages
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
@@ -8,6 +10,7 @@ use crate::chatlist::get_last_message_for_chat;
|
||||
use crate::download::{DownloadState, PRE_MSG_ATTACHMENT_SIZE_THRESHOLD};
|
||||
use crate::message::{Message, Viewtype};
|
||||
use crate::test_utils::TestContextManager;
|
||||
use crate::tests::pre_messages::util::send_large_file_message;
|
||||
|
||||
/// Test that forwarding Pre-Message should forward additional text to not be empty
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
@@ -86,6 +89,43 @@ async fn test_forwarding_pre_message_empty_text() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_receive_both() -> Result<()> {
|
||||
let mut tcm = TestContextManager::new();
|
||||
let alice = &tcm.alice().await;
|
||||
let bob = &tcm.bob().await;
|
||||
let alice_chat_id = alice.create_group_with_members("", &[bob]).await;
|
||||
|
||||
let (pre_message, post_message, alice_msg_id) =
|
||||
send_large_file_message(alice, alice_chat_id, Viewtype::File, &vec![0u8; 200_000]).await?;
|
||||
|
||||
let msg = bob.recv_msg(&pre_message).await;
|
||||
let _ = bob.recv_msg_trash(&post_message).await;
|
||||
let msg = Message::load_from_db(bob, msg.id).await?;
|
||||
assert_eq!(msg.download_state(), DownloadState::Done);
|
||||
assert_eq!(msg.text, "test".to_owned());
|
||||
|
||||
forward_msgs(alice, &[alice_msg_id], alice_chat_id).await?;
|
||||
let rev_order = false;
|
||||
let msg = bob
|
||||
.recv_msg(
|
||||
&alice
|
||||
.pop_sent_msg_ex(rev_order, Duration::ZERO)
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(msg.download_state(), DownloadState::Available);
|
||||
assert_eq!(msg.is_forwarded(), true);
|
||||
assert_eq!(msg.text, "test".to_owned());
|
||||
let _ = bob.recv_msg_trash(&alice.pop_sent_msg().await).await;
|
||||
let msg = Message::load_from_db(bob, msg.id).await?;
|
||||
assert_eq!(msg.download_state(), DownloadState::Done);
|
||||
assert_eq!(msg.is_forwarded(), true);
|
||||
assert_eq!(msg.text, "test".to_owned());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test that forwarding Pre-Message should forward additional text to not be empty
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_saving_pre_message_empty_text() -> Result<()> {
|
||||
|
||||
@@ -791,7 +791,18 @@ pub(crate) async fn sync_transports(
|
||||
context
|
||||
.sql
|
||||
.transaction(|transaction| {
|
||||
let configured_addr = transaction.query_row(
|
||||
"SELECT value FROM config WHERE keyname='configured_addr'",
|
||||
(),
|
||||
|row| {
|
||||
let addr: String = row.get(0)?;
|
||||
Ok(addr)
|
||||
},
|
||||
)?;
|
||||
for RemovedTransportData { addr, timestamp } in removed_transports {
|
||||
if *addr == configured_addr {
|
||||
continue;
|
||||
}
|
||||
modified |= transaction.execute(
|
||||
"DELETE FROM transports
|
||||
WHERE addr=? AND add_timestamp<=?",
|
||||
|
||||
@@ -21,7 +21,6 @@ mod maps_integration;
|
||||
use std::cmp::max;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context as _, Result, anyhow, bail, ensure, format_err};
|
||||
|
||||
@@ -120,7 +119,7 @@ pub struct WebxdcInfo {
|
||||
|
||||
/// Milliseconds to wait before calling `sendUpdate()` again since the last call.
|
||||
/// Should be exposed to `window.sendUpdateInterval` in JS land.
|
||||
pub send_update_interval_ms: usize,
|
||||
pub send_update_interval: usize,
|
||||
|
||||
/// Maximum number of bytes accepted for a serialized update object.
|
||||
/// Should be exposed to `window.sendUpdateMaxSize` in JS land.
|
||||
@@ -551,7 +550,7 @@ impl Context {
|
||||
|
||||
let send_now = !matches!(
|
||||
instance.state,
|
||||
MessageState::Undefined | MessageState::OutPreparing | MessageState::OutDraft
|
||||
MessageState::Undefined | MessageState::OutDraft
|
||||
);
|
||||
|
||||
status_update.uid = Some(create_id());
|
||||
@@ -975,16 +974,7 @@ impl Message {
|
||||
self_addr,
|
||||
is_app_sender,
|
||||
is_broadcast,
|
||||
send_update_interval_ms: context
|
||||
.ratelimit
|
||||
.read()
|
||||
.await
|
||||
.min_send_interval()
|
||||
// Round the value up so that it's not 0 at least.
|
||||
.checked_add(Duration::from_nanos(999_999))
|
||||
.context("Overflow occurred")?
|
||||
.as_millis()
|
||||
.try_into()?,
|
||||
send_update_interval: context.ratelimit.read().await.update_interval(),
|
||||
send_update_max_size: RECOMMENDED_FILE_SIZE as usize,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1226,7 +1226,7 @@ async fn test_get_webxdc_info() -> Result<()> {
|
||||
let info = instance.get_webxdc_info(&t).await?;
|
||||
assert_eq!(info.name, "minimal.xdc");
|
||||
assert_eq!(info.icon, WEBXDC_DEFAULT_ICON.to_string());
|
||||
assert_eq!(info.send_update_interval_ms, 1000);
|
||||
assert_eq!(info.send_update_interval, 1000);
|
||||
assert_eq!(info.send_update_max_size, RECOMMENDED_FILE_SIZE as usize);
|
||||
|
||||
let mut instance = create_webxdc_instance(
|
||||
|
||||
Reference in New Issue
Block a user