mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Add smtp table
It replaces SendMsgToSmtp job. Prepared outgoing SMTP payloads are stored in the database now rather than files in blobdir.
This commit is contained in:
@@ -737,7 +737,6 @@ class TestOnlineAccount:
|
||||
# make sure we are not sending message to ourselves
|
||||
assert self_addr not in ev.data2
|
||||
assert other_addr in ev.data2
|
||||
ev = ac1._evtracker.get_matching("DC_EVENT_DELETED_BLOB_FILE")
|
||||
|
||||
lp.sec("ac1: setting bcc_self=1")
|
||||
ac1.set_config("bcc_self", "1")
|
||||
@@ -753,7 +752,6 @@ class TestOnlineAccount:
|
||||
# now make sure we are sending message to ourselves too
|
||||
assert self_addr in ev.data2
|
||||
assert other_addr in ev.data2
|
||||
ev = ac1._evtracker.get_matching("DC_EVENT_DELETED_BLOB_FILE")
|
||||
assert ac1.direct_imap.idle_wait_for_seen()
|
||||
|
||||
# Second client receives only second message, but not the first
|
||||
|
||||
201
src/chat.rs
201
src/chat.rs
@@ -5,7 +5,7 @@ use std::convert::{TryFrom, TryInto};
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Result};
|
||||
use anyhow::{bail, ensure, Context as _, Result};
|
||||
use async_std::path::{Path, PathBuf};
|
||||
use deltachat_derive::{FromSql, ToSql};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -32,10 +32,14 @@ use crate::ephemeral::{delete_expired_messages, schedule_ephemeral_task, Timer a
|
||||
use crate::events::EventType;
|
||||
use crate::html::new_html_mimepart;
|
||||
use crate::job::{self, Action};
|
||||
use crate::location;
|
||||
use crate::message::{self, Message, MessageState, MsgId};
|
||||
use crate::mimefactory::MimeFactory;
|
||||
use crate::mimeparser::SystemMessage;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::peerstate::{Peerstate, PeerstateVerifiedStatus};
|
||||
use crate::scheduler::InterruptInfo;
|
||||
use crate::smtp::send_msg_to_smtp;
|
||||
use crate::stock_str;
|
||||
use crate::webxdc::{WEBXDC_SENDING_LIMIT, WEBXDC_SUFFIX};
|
||||
|
||||
@@ -1283,7 +1287,7 @@ impl Chat {
|
||||
msg.param.set_int(Param::AttachGroupImage, 1);
|
||||
self.param.remove(Param::Unpromoted);
|
||||
self.update_param(context).await?;
|
||||
// send_sync_msg() is called (usually) a moment later at Job::send_msg_to_smtp()
|
||||
// send_sync_msg() is called (usually) a moment later at send_msg_to_smtp()
|
||||
// when the group-creation message is actually sent though SMTP -
|
||||
// this makes sure, the other devices are aware of grpid that is used in the sync-message.
|
||||
context.sync_qr_code_tokens(Some(self.id)).await?;
|
||||
@@ -1963,41 +1967,25 @@ pub async fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) ->
|
||||
|
||||
/// Tries to send a message synchronously.
|
||||
///
|
||||
/// Directly opens an smtp
|
||||
/// connection and sends the message, bypassing the job system. If this fails, it writes a send job to
|
||||
/// the database.
|
||||
/// Creates a new message in `smtp` table, then drectly opens an SMTP connection and sends the
|
||||
/// message. If this fails, the message remains in the database to be sent later.
|
||||
pub async fn send_msg_sync(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result<MsgId> {
|
||||
if let Some(mut job) = prepare_send_msg(context, chat_id, msg).await? {
|
||||
if let Some(rowid) = prepare_send_msg(context, chat_id, msg).await? {
|
||||
let mut smtp = crate::smtp::Smtp::new();
|
||||
send_msg_to_smtp(context, &mut smtp, rowid)
|
||||
.await
|
||||
.context("failed to send message, queued for later sending")?;
|
||||
|
||||
let status = job.send_msg_to_smtp(context, &mut smtp).await;
|
||||
|
||||
match status {
|
||||
job::Status::Finished(Ok(_)) => {
|
||||
context.emit_event(EventType::MsgsChanged {
|
||||
chat_id: msg.chat_id,
|
||||
msg_id: msg.id,
|
||||
});
|
||||
|
||||
Ok(msg.id)
|
||||
}
|
||||
_ => {
|
||||
job.save(context).await?;
|
||||
Err(format_err!(
|
||||
"failed to send message, queued for later sending"
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Nothing to do
|
||||
Ok(msg.id)
|
||||
context.emit_event(EventType::MsgsChanged {
|
||||
chat_id: msg.chat_id,
|
||||
msg_id: msg.id,
|
||||
});
|
||||
}
|
||||
Ok(msg.id)
|
||||
}
|
||||
|
||||
async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result<MsgId> {
|
||||
if let Some(send_job) = prepare_send_msg(context, chat_id, msg).await? {
|
||||
job::add(context, send_job).await?;
|
||||
|
||||
if prepare_send_msg(context, chat_id, msg).await?.is_some() {
|
||||
context.emit_event(EventType::MsgsChanged {
|
||||
chat_id: msg.chat_id,
|
||||
msg_id: msg.id,
|
||||
@@ -2006,16 +1994,19 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -
|
||||
if msg.param.exists(Param::SetLatitude) {
|
||||
context.emit_event(EventType::LocationChanged(Some(DC_CONTACT_ID_SELF)));
|
||||
}
|
||||
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
}
|
||||
|
||||
Ok(msg.id)
|
||||
}
|
||||
|
||||
/// Returns rowid from `smtp` table.
|
||||
async fn prepare_send_msg(
|
||||
context: &Context,
|
||||
chat_id: ChatId,
|
||||
msg: &mut Message,
|
||||
) -> Result<Option<crate::job::Job>> {
|
||||
) -> Result<Option<i64>> {
|
||||
// dc_prepare_msg() leaves the message state to OutPreparing, we
|
||||
// only have to change the state to OutPending in this case.
|
||||
// Otherwise we still have to prepare the message, which will set
|
||||
@@ -2031,9 +2022,143 @@ async fn prepare_send_msg(
|
||||
);
|
||||
message::update_msg_state(context, msg.id, MessageState::OutPending).await?;
|
||||
}
|
||||
let job = job::send_msg_job(context, msg.id).await?;
|
||||
let row_id = create_send_msg_job(context, msg.id).await?;
|
||||
Ok(row_id)
|
||||
}
|
||||
|
||||
Ok(job)
|
||||
/// Constructs a job for sending a message and inserts into `smtp` table.
|
||||
///
|
||||
/// Returns rowid if job was created or `None` if SMTP job is not needed, e.g. when sending to a
|
||||
/// group with only self and no BCC-to-self configured.
|
||||
///
|
||||
/// The caller has to interrupt SMTP loop or otherwise process a new row.
|
||||
async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<i64>> {
|
||||
let mut msg = Message::load_from_db(context, msg_id).await?;
|
||||
msg.try_calc_and_set_dimensions(context)
|
||||
.await
|
||||
.context("failed to calculate media dimensions")?;
|
||||
|
||||
/* create message */
|
||||
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
|
||||
|
||||
let attach_selfavatar = match shall_attach_selfavatar(context, msg.chat_id).await {
|
||||
Ok(attach_selfavatar) => attach_selfavatar,
|
||||
Err(err) => {
|
||||
warn!(context, "job: cannot get selfavatar-state: {}", err);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?;
|
||||
|
||||
let mut recipients = mimefactory.recipients();
|
||||
|
||||
let from = context
|
||||
.get_config(Config::ConfiguredAddr)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
let lowercase_from = from.to_lowercase();
|
||||
|
||||
// Send BCC to self if it is enabled and we are not going to
|
||||
// delete it immediately.
|
||||
if context.get_config_bool(Config::BccSelf).await?
|
||||
&& context.get_config_delete_server_after().await? != Some(0)
|
||||
&& !recipients
|
||||
.iter()
|
||||
.any(|x| x.to_lowercase() == lowercase_from)
|
||||
{
|
||||
recipients.push(from);
|
||||
}
|
||||
|
||||
if recipients.is_empty() {
|
||||
// may happen eg. for groups with only SELF and bcc_self disabled
|
||||
info!(
|
||||
context,
|
||||
"message {} has no recipient, skipping smtp-send", msg_id
|
||||
);
|
||||
msg_id.set_delivered(context).await?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let rendered_msg = match mimefactory.render(context).await {
|
||||
Ok(res) => Ok(res),
|
||||
Err(err) => {
|
||||
message::set_msg_failed(context, msg_id, Some(err.to_string())).await;
|
||||
Err(err)
|
||||
}
|
||||
}?;
|
||||
|
||||
if needs_encryption && !rendered_msg.is_encrypted {
|
||||
/* unrecoverable */
|
||||
message::set_msg_failed(
|
||||
context,
|
||||
msg_id,
|
||||
Some("End-to-end-encryption unavailable unexpectedly."),
|
||||
)
|
||||
.await;
|
||||
bail!(
|
||||
"e2e encryption unavailable {} - {:?}",
|
||||
msg_id,
|
||||
needs_encryption
|
||||
);
|
||||
}
|
||||
|
||||
if rendered_msg.is_gossiped {
|
||||
msg.chat_id.set_gossiped_timestamp(context, time()).await?;
|
||||
}
|
||||
|
||||
if 0 != rendered_msg.last_added_location_id {
|
||||
if let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, time()).await {
|
||||
error!(context, "Failed to set kml sent_timestamp: {:?}", err);
|
||||
}
|
||||
if !msg.hidden {
|
||||
if let Err(err) =
|
||||
location::set_msg_location_id(context, msg.id, rendered_msg.last_added_location_id)
|
||||
.await
|
||||
{
|
||||
error!(context, "Failed to set msg_location_id: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(sync_ids) = rendered_msg.sync_ids_to_delete {
|
||||
if let Err(err) = context.delete_sync_ids(sync_ids).await {
|
||||
error!(context, "Failed to delete sync ids: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
if attach_selfavatar {
|
||||
if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, time()).await {
|
||||
error!(context, "Failed to set selfavatar timestamp: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
if rendered_msg.is_encrypted && !needs_encryption {
|
||||
msg.param.set_int(Param::GuaranteeE2ee, 1);
|
||||
msg.update_param(context).await;
|
||||
}
|
||||
|
||||
ensure!(!recipients.is_empty(), "no recipients for smtp job set");
|
||||
|
||||
let recipients = recipients.join(" ");
|
||||
|
||||
msg.subject = rendered_msg.subject.clone();
|
||||
msg.update_subject(context).await;
|
||||
|
||||
let row_id = context
|
||||
.sql
|
||||
.insert(
|
||||
"INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
paramsv![
|
||||
&rendered_msg.rfc724_mid,
|
||||
recipients,
|
||||
&rendered_msg.message,
|
||||
msg_id
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
Ok(Some(row_id))
|
||||
}
|
||||
|
||||
pub async fn send_text_msg(
|
||||
@@ -3049,8 +3174,8 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId)
|
||||
.prepare_msg_raw(context, &mut msg, None, curr_timestamp)
|
||||
.await?;
|
||||
curr_timestamp += 1;
|
||||
if let Some(send_job) = job::send_msg_job(context, new_msg_id).await? {
|
||||
job::add(context, send_job).await?;
|
||||
if create_send_msg_job(context, new_msg_id).await?.is_some() {
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
}
|
||||
}
|
||||
created_chats.push(chat_id);
|
||||
@@ -4467,7 +4592,8 @@ mod tests {
|
||||
);
|
||||
|
||||
// Alice has an SMTP-server replacing the `Message-ID:`-header (as done eg. by outlook.com).
|
||||
let msg = alice.pop_sent_msg().await.payload();
|
||||
let sent_msg = alice.pop_sent_msg().await;
|
||||
let msg = sent_msg.payload();
|
||||
assert_eq!(msg.match_indices("Gr.").count(), 2);
|
||||
let msg = msg.replace("Message-ID: <Gr.", "Message-ID: <XXX");
|
||||
assert_eq!(msg.match_indices("Gr.").count(), 1);
|
||||
@@ -4487,7 +4613,8 @@ mod tests {
|
||||
// Bob answers - simulate a normal MUA by not setting `Chat-*`-headers;
|
||||
// moreover, Bob's SMTP-server also replaces the `Message-ID:`-header
|
||||
send_text_msg(&bob, bob_chat.id, "ho!".to_string()).await?;
|
||||
let msg = bob.pop_sent_msg().await.payload();
|
||||
let sent_msg = bob.pop_sent_msg().await;
|
||||
let msg = sent_msg.payload();
|
||||
let msg = msg.replace("Message-ID: <Gr.", "Message-ID: <XXX");
|
||||
let msg = msg.replace("Chat-", "XXXX-");
|
||||
assert_eq!(msg.match_indices("Chat-").count(), 0);
|
||||
|
||||
@@ -4866,7 +4866,7 @@ Second thread."#;
|
||||
let mdn_body = rendered_mdn.message;
|
||||
|
||||
// Alice receives the read receipt.
|
||||
dc_receive_imf(&alice, &mdn_body, "INBOX", false).await?;
|
||||
dc_receive_imf(&alice, mdn_body.as_bytes(), "INBOX", false).await?;
|
||||
|
||||
// Chat should not pop up in the chatlist.
|
||||
let chats = Chatlist::try_load(&alice, 0, None, None).await?;
|
||||
|
||||
393
src/job.rs
393
src/job.rs
@@ -3,28 +3,24 @@
|
||||
//! This module implements a job queue maintained in the SQLite database
|
||||
//! and job types.
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Error, Result};
|
||||
use async_smtp::smtp::response::{Category, Code, Detail};
|
||||
use anyhow::{bail, format_err, Context as _, Error, Result};
|
||||
use deltachat_derive::{FromSql, ToSql};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
use crate::chat::{self, ChatId};
|
||||
use crate::config::Config;
|
||||
use crate::contact::{normalize_name, Contact, Modifier, Origin};
|
||||
use crate::context::Context;
|
||||
use crate::dc_tools::{dc_delete_file, dc_read_file, time};
|
||||
use crate::dc_tools::time;
|
||||
use crate::events::EventType;
|
||||
use crate::imap::{Imap, ImapActionResult};
|
||||
use crate::location;
|
||||
use crate::log::LogExt;
|
||||
use crate::message::{self, Message, MessageState, MsgId};
|
||||
use crate::message::{Message, MsgId};
|
||||
use crate::mimefactory::MimeFactory;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::scheduler::InterruptInfo;
|
||||
use crate::smtp::Smtp;
|
||||
use crate::smtp::{smtp_send, Smtp};
|
||||
use crate::sql;
|
||||
|
||||
// results in ~3 weeks for the last backoff timespan
|
||||
@@ -109,7 +105,6 @@ pub enum Action {
|
||||
MaybeSendLocations = 5005, // low priority ...
|
||||
MaybeSendLocationsEnded = 5007,
|
||||
SendMdn = 5010,
|
||||
SendMsgToSmtp = 5901, // ... high priority
|
||||
}
|
||||
|
||||
impl Default for Action {
|
||||
@@ -135,7 +130,6 @@ impl From<Action> for Thread {
|
||||
MaybeSendLocations => Thread::Smtp,
|
||||
MaybeSendLocationsEnded => Thread::Smtp,
|
||||
SendMdn => Thread::Smtp,
|
||||
SendMsgToSmtp => Thread::Smtp,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -226,217 +220,6 @@ impl Job {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn smtp_send<F, Fut>(
|
||||
&mut self,
|
||||
context: &Context,
|
||||
recipients: Vec<async_smtp::EmailAddress>,
|
||||
message: Vec<u8>,
|
||||
job_id: u32,
|
||||
smtp: &mut Smtp,
|
||||
success_cb: F,
|
||||
) -> Status
|
||||
where
|
||||
F: FnOnce() -> Fut,
|
||||
Fut: Future<Output = Result<()>>,
|
||||
{
|
||||
// hold the smtp lock during sending of a job and
|
||||
// its ok/error response processing. Note that if a message
|
||||
// was sent we need to mark it in the database ASAP as we
|
||||
// otherwise might send it twice.
|
||||
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
||||
info!(context, "smtp-sending out mime message:");
|
||||
println!("{}", String::from_utf8_lossy(&message));
|
||||
}
|
||||
|
||||
smtp.connectivity.set_working(context).await;
|
||||
|
||||
let send_result = smtp.send(context, recipients, message, job_id).await;
|
||||
smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
|
||||
|
||||
let status = match send_result {
|
||||
Err(crate::smtp::send::Error::SmtpSend(err)) => {
|
||||
// Remote error, retry later.
|
||||
warn!(context, "SMTP failed to send: {:?}", &err);
|
||||
|
||||
let res = match err {
|
||||
async_smtp::smtp::error::Error::Permanent(ref response) => {
|
||||
// Workaround for incorrectly configured servers returning permanent errors
|
||||
// instead of temporary ones.
|
||||
let maybe_transient = match response.code {
|
||||
// Sometimes servers send a permanent error when actually it is a temporary error
|
||||
// For documentation see <https://tools.ietf.org/html/rfc3463>
|
||||
Code {
|
||||
category: Category::MailSystem,
|
||||
detail: Detail::Zero,
|
||||
..
|
||||
} => {
|
||||
// Ignore status code 5.5.0, see <https://support.delta.chat/t/every-other-message-gets-stuck/877/2>
|
||||
// Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns
|
||||
// "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later".
|
||||
//
|
||||
// Other enhanced status codes, such as Postfix
|
||||
// "550 5.1.1 <foobar@example.org>: Recipient address rejected: User unknown in local recipient table"
|
||||
// are not ignored.
|
||||
response.first_word() == Some(&"5.5.0".to_string())
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if maybe_transient {
|
||||
Status::RetryLater
|
||||
} else {
|
||||
// If we do not retry, add an info message to the chat.
|
||||
// Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..."
|
||||
// should definitely go here, because user has to open the link to
|
||||
// resume message sending.
|
||||
Status::Finished(Err(format_err!("Permanent SMTP error: {}", err)))
|
||||
}
|
||||
}
|
||||
async_smtp::smtp::error::Error::Transient(ref response) => {
|
||||
// We got a transient 4xx response from SMTP server.
|
||||
// Give some time until the server-side error maybe goes away.
|
||||
|
||||
if let Some(first_word) = response.first_word() {
|
||||
if first_word.ends_with(".1.1")
|
||||
|| first_word.ends_with(".1.2")
|
||||
|| first_word.ends_with(".1.3")
|
||||
{
|
||||
// Sometimes we receive transient errors that should be permanent.
|
||||
// Any extended smtp status codes like x.1.1, x.1.2 or x.1.3 that we
|
||||
// receive as a transient error are misconfigurations of the smtp server.
|
||||
// See <https://tools.ietf.org/html/rfc3463#section-3.2>
|
||||
info!(context, "Smtp-job #{} Received extended status code {} for a transient error. This looks like a misconfigured smtp server, let's fail immediatly", self.job_id, first_word);
|
||||
Status::Finished(Err(format_err!("Permanent SMTP error: {}", err)))
|
||||
} else {
|
||||
Status::RetryLater
|
||||
}
|
||||
} else {
|
||||
Status::RetryLater
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if smtp.has_maybe_stale_connection().await {
|
||||
info!(context, "stale connection? immediately reconnecting");
|
||||
Status::RetryNow
|
||||
} else {
|
||||
Status::RetryLater
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// this clears last_success info
|
||||
smtp.disconnect().await;
|
||||
|
||||
res
|
||||
}
|
||||
Err(crate::smtp::send::Error::Envelope(err)) => {
|
||||
// Local error, job is invalid, do not retry.
|
||||
smtp.disconnect().await;
|
||||
warn!(context, "SMTP job is invalid: {}", err);
|
||||
Status::Finished(Err(err.into()))
|
||||
}
|
||||
Err(crate::smtp::send::Error::NoTransport) => {
|
||||
// Should never happen.
|
||||
// It does not even make sense to disconnect here.
|
||||
error!(context, "SMTP job failed because SMTP has no transport");
|
||||
Status::Finished(Err(format_err!("SMTP has not transport")))
|
||||
}
|
||||
Err(crate::smtp::send::Error::Other(err)) => {
|
||||
// Local error, job is invalid, do not retry.
|
||||
smtp.disconnect().await;
|
||||
warn!(context, "unable to load job: {}", err);
|
||||
Status::Finished(Err(err))
|
||||
}
|
||||
Ok(()) => {
|
||||
job_try!(success_cb().await);
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
};
|
||||
|
||||
if let Status::Finished(Err(err)) = &status {
|
||||
// We couldn't send the message, so mark it as failed
|
||||
let msg_id = MsgId::new(self.foreign_id);
|
||||
message::set_msg_failed(context, msg_id, Some(err.to_string())).await;
|
||||
}
|
||||
status
|
||||
}
|
||||
|
||||
pub(crate) async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
|
||||
// SMTP server, if not yet done
|
||||
if let Err(err) = smtp.connect_configured(context).await {
|
||||
warn!(context, "SMTP connection failure: {:?}", err);
|
||||
smtp.last_send_error = Some(format!("SMTP connection failure: {:#}", err));
|
||||
return Status::RetryLater;
|
||||
}
|
||||
|
||||
let filename = job_try!(job_try!(self
|
||||
.param
|
||||
.get_path(Param::File, context)
|
||||
.context("can't get filename"))
|
||||
.context("Can't get filename"));
|
||||
let body = job_try!(dc_read_file(context, &filename).await);
|
||||
let recipients = job_try!(self
|
||||
.param
|
||||
.get(Param::Recipients)
|
||||
.context("missing recipients"));
|
||||
|
||||
let recipients_list = recipients
|
||||
.split('\x1e')
|
||||
.filter_map(
|
||||
|addr| match async_smtp::EmailAddress::new(addr.to_string()) {
|
||||
Ok(addr) => Some(addr),
|
||||
Err(err) => {
|
||||
warn!(context, "invalid recipient: {} {:?}", addr, err);
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
/* if there is a msg-id and it does not exist in the db, cancel sending.
|
||||
this happends if dc_delete_msgs() was called
|
||||
before the generated mime was sent out */
|
||||
if 0 != self.foreign_id {
|
||||
match message::exists(context, MsgId::new(self.foreign_id)).await {
|
||||
Ok(exists) => {
|
||||
if !exists {
|
||||
return Status::Finished(Err(format_err!(
|
||||
"Not sending Message {} as it was deleted",
|
||||
self.foreign_id
|
||||
)));
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(context, "failed to check message existence: {:?}", err);
|
||||
smtp.last_send_error =
|
||||
Some(format!("failed to check message existence: {:#}", err));
|
||||
return Status::RetryLater;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let foreign_id = self.foreign_id;
|
||||
self.smtp_send(context, recipients_list, body, self.job_id, smtp, || {
|
||||
async move {
|
||||
// smtp success, update db ASAP, then delete smtp file
|
||||
if 0 != foreign_id {
|
||||
set_delivered(context, MsgId::new(foreign_id)).await?;
|
||||
}
|
||||
// now also delete the generated file
|
||||
dc_delete_file(context, filename).await;
|
||||
|
||||
// finally, create another send-job if there are items to be synced.
|
||||
// triggering sync-job after msg-send-job guarantees, the recipient has grpid etc.
|
||||
// once the sync message arrives.
|
||||
// if there are no items to sync, this function returns fast.
|
||||
context.send_sync_msg().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job.
|
||||
async fn get_additional_mdn_jobs(
|
||||
&self,
|
||||
@@ -535,14 +318,13 @@ impl Job {
|
||||
return Status::RetryLater;
|
||||
}
|
||||
|
||||
self.smtp_send(context, recipients, body, self.job_id, smtp, || {
|
||||
async move {
|
||||
// Remove additional SendMdn jobs we have aggregated into this one.
|
||||
kill_ids(context, &additional_job_ids).await?;
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
.await
|
||||
let status = smtp_send(context, recipients, body, smtp, msg_id, 0).await;
|
||||
if matches!(status, Status::Finished(Ok(_))) {
|
||||
// Remove additional SendMdn jobs we have aggregated into this one.
|
||||
job_try!(kill_ids(context, &additional_job_ids).await);
|
||||
}
|
||||
|
||||
status
|
||||
}
|
||||
|
||||
/// Read the recipients from old emails sent by the user and add them as contacts.
|
||||
@@ -703,17 +485,6 @@ pub async fn action_exists(context: &Context, action: Action) -> Result<bool> {
|
||||
Ok(exists)
|
||||
}
|
||||
|
||||
async fn set_delivered(context: &Context, msg_id: MsgId) -> Result<()> {
|
||||
message::update_msg_state(context, msg_id, MessageState::OutDelivered).await?;
|
||||
let chat_id: ChatId = context
|
||||
.sql
|
||||
.query_get_value("SELECT chat_id FROM msgs WHERE id=?", paramsv![msg_id])
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
context.emit_event(EventType::MsgDelivered { chat_id, msg_id });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, folder: Config) {
|
||||
let mailbox = if let Ok(Some(m)) = context.get_config(folder).await {
|
||||
m
|
||||
@@ -759,132 +530,6 @@ async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, fold
|
||||
};
|
||||
}
|
||||
|
||||
/// Constructs a job for sending a message.
|
||||
///
|
||||
/// Returns `None` if no messages need to be sent out.
|
||||
///
|
||||
/// In order to be processed, must be `add`ded.
|
||||
pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result<Option<Job>> {
|
||||
let mut msg = Message::load_from_db(context, msg_id).await?;
|
||||
msg.try_calc_and_set_dimensions(context).await.ok();
|
||||
|
||||
/* create message */
|
||||
let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default();
|
||||
|
||||
let attach_selfavatar = match chat::shall_attach_selfavatar(context, msg.chat_id).await {
|
||||
Ok(attach_selfavatar) => attach_selfavatar,
|
||||
Err(err) => {
|
||||
warn!(context, "job: cannot get selfavatar-state: {}", err);
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?;
|
||||
|
||||
let mut recipients = mimefactory.recipients();
|
||||
|
||||
let from = context
|
||||
.get_config(Config::ConfiguredAddr)
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
let lowercase_from = from.to_lowercase();
|
||||
|
||||
// Send BCC to self if it is enabled and we are not going to
|
||||
// delete it immediately.
|
||||
if context.get_config_bool(Config::BccSelf).await?
|
||||
&& context.get_config_delete_server_after().await? != Some(0)
|
||||
&& !recipients
|
||||
.iter()
|
||||
.any(|x| x.to_lowercase() == lowercase_from)
|
||||
{
|
||||
recipients.push(from);
|
||||
}
|
||||
|
||||
if recipients.is_empty() {
|
||||
// may happen eg. for groups with only SELF and bcc_self disabled
|
||||
info!(
|
||||
context,
|
||||
"message {} has no recipient, skipping smtp-send", msg_id
|
||||
);
|
||||
set_delivered(context, msg_id).await?;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let rendered_msg = match mimefactory.render(context).await {
|
||||
Ok(res) => Ok(res),
|
||||
Err(err) => {
|
||||
message::set_msg_failed(context, msg_id, Some(err.to_string())).await;
|
||||
Err(err)
|
||||
}
|
||||
}?;
|
||||
|
||||
if needs_encryption && !rendered_msg.is_encrypted {
|
||||
/* unrecoverable */
|
||||
message::set_msg_failed(
|
||||
context,
|
||||
msg_id,
|
||||
Some("End-to-end-encryption unavailable unexpectedly."),
|
||||
)
|
||||
.await;
|
||||
bail!(
|
||||
"e2e encryption unavailable {} - {:?}",
|
||||
msg_id,
|
||||
needs_encryption
|
||||
);
|
||||
}
|
||||
|
||||
if rendered_msg.is_gossiped {
|
||||
msg.chat_id.set_gossiped_timestamp(context, time()).await?;
|
||||
}
|
||||
|
||||
if 0 != rendered_msg.last_added_location_id {
|
||||
if let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, time()).await {
|
||||
error!(context, "Failed to set kml sent_timestamp: {:?}", err);
|
||||
}
|
||||
if !msg.hidden {
|
||||
if let Err(err) =
|
||||
location::set_msg_location_id(context, msg.id, rendered_msg.last_added_location_id)
|
||||
.await
|
||||
{
|
||||
error!(context, "Failed to set msg_location_id: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(sync_ids) = rendered_msg.sync_ids_to_delete {
|
||||
if let Err(err) = context.delete_sync_ids(sync_ids).await {
|
||||
error!(context, "Failed to delete sync ids: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
if attach_selfavatar {
|
||||
if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, time()).await {
|
||||
error!(context, "Failed to set selfavatar timestamp: {:?}", err);
|
||||
}
|
||||
}
|
||||
|
||||
if rendered_msg.is_encrypted && !needs_encryption {
|
||||
msg.param.set_int(Param::GuaranteeE2ee, 1);
|
||||
msg.update_param(context).await;
|
||||
}
|
||||
|
||||
ensure!(!recipients.is_empty(), "no recipients for smtp job set");
|
||||
let mut param = Params::new();
|
||||
let bytes = &rendered_msg.message;
|
||||
let blob = BlobObject::create(context, &rendered_msg.rfc724_mid, bytes).await?;
|
||||
|
||||
let recipients = recipients.join("\x1e");
|
||||
param.set(Param::File, blob.as_name());
|
||||
param.set(Param::Recipients, &recipients);
|
||||
|
||||
msg.subject = rendered_msg.subject.clone();
|
||||
msg.update_subject(context).await;
|
||||
|
||||
let job = create(Action::SendMsgToSmtp, msg_id.to_u32(), param, 0)?;
|
||||
|
||||
Ok(Some(job))
|
||||
}
|
||||
|
||||
pub(crate) enum Connection<'a> {
|
||||
Inbox(&'a mut Imap),
|
||||
Smtp(&'a mut Smtp),
|
||||
@@ -992,7 +637,6 @@ async fn perform_job_action(
|
||||
|
||||
let try_res = match job.action {
|
||||
Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))),
|
||||
Action::SendMsgToSmtp => job.send_msg_to_smtp(context, connection.smtp()).await,
|
||||
Action::SendMdn => job.send_mdn(context, connection.smtp()).await,
|
||||
Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await,
|
||||
Action::MaybeSendLocationsEnded => {
|
||||
@@ -1056,16 +700,6 @@ pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Creates a job.
|
||||
pub fn create(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Result<Job> {
|
||||
ensure!(
|
||||
action != Action::Unknown,
|
||||
"Invalid action passed to job_add"
|
||||
);
|
||||
|
||||
Ok(Job::new(action, foreign_id, param, delay_seconds))
|
||||
}
|
||||
|
||||
/// Adds a job to the database, scheduling it.
|
||||
pub async fn add(context: &Context, job: Job) -> Result<()> {
|
||||
let action = job.action;
|
||||
@@ -1084,10 +718,7 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
|
||||
info!(context, "interrupt: imap");
|
||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
}
|
||||
Action::MaybeSendLocations
|
||||
| Action::MaybeSendLocationsEnded
|
||||
| Action::SendMdn
|
||||
| Action::SendMsgToSmtp => {
|
||||
Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn => {
|
||||
info!(context, "interrupt: smtp");
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
}
|
||||
|
||||
@@ -113,10 +113,14 @@ WHERE id=?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deletes a message and corresponding MDNs from the database.
|
||||
/// Deletes a message, corresponding MDNs and unsent SMTP messages from the database.
|
||||
pub async fn delete_from_db(self, context: &Context) -> Result<()> {
|
||||
// We don't use transactions yet, so remove MDNs first to make
|
||||
// sure they are not left while the message is deleted.
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp WHERE msg_id=?", paramsv![self])
|
||||
.await?;
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM msgs_mdns WHERE msg_id=?;", paramsv![self])
|
||||
@@ -135,6 +139,20 @@ WHERE id=?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn set_delivered(self, context: &Context) -> Result<()> {
|
||||
update_msg_state(context, self, MessageState::OutDelivered).await?;
|
||||
let chat_id: ChatId = context
|
||||
.sql
|
||||
.query_get_value("SELECT chat_id FROM msgs WHERE id=?", paramsv![self])
|
||||
.await?
|
||||
.unwrap_or_default();
|
||||
context.emit_event(EventType::MsgDelivered {
|
||||
chat_id,
|
||||
msg_id: self,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Bad evil escape hatch.
|
||||
///
|
||||
/// Avoid using this, eventually types should be cleaned up enough
|
||||
|
||||
@@ -81,7 +81,7 @@ pub struct MimeFactory<'a> {
|
||||
/// Result of rendering a message, ready to be submitted to a send job.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RenderedEmail {
|
||||
pub message: Vec<u8>,
|
||||
pub message: String,
|
||||
// pub envelope: Envelope,
|
||||
pub is_encrypted: bool,
|
||||
pub is_gossiped: bool,
|
||||
@@ -770,7 +770,7 @@ impl<'a> MimeFactory<'a> {
|
||||
} = self;
|
||||
|
||||
Ok(RenderedEmail {
|
||||
message: outer_message.build().as_string().into_bytes(),
|
||||
message: outer_message.build().as_string(),
|
||||
// envelope: Envelope::new,
|
||||
is_encrypted,
|
||||
is_gossiped,
|
||||
@@ -1931,7 +1931,7 @@ mod tests {
|
||||
|
||||
let rendered_msg = mimefactory.render(context).await.unwrap();
|
||||
|
||||
let mail = mailparse::parse_mail(&rendered_msg.message).unwrap();
|
||||
let mail = mailparse::parse_mail(rendered_msg.message.as_bytes()).unwrap();
|
||||
assert_eq!(
|
||||
mail.headers
|
||||
.iter()
|
||||
@@ -1941,7 +1941,7 @@ mod tests {
|
||||
"1.0"
|
||||
);
|
||||
|
||||
let _mime_msg = MimeMessage::from_bytes(context, &rendered_msg.message)
|
||||
let _mime_msg = MimeMessage::from_bytes(context, rendered_msg.message.as_bytes())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -2015,8 +2015,9 @@ mod tests {
|
||||
let mut msg = Message::new(Viewtype::Text);
|
||||
msg.set_text(Some("this is the text!".to_string()));
|
||||
|
||||
let payload = t.send_msg(chat.id, &mut msg).await.payload();
|
||||
let mut payload = payload.splitn(3, "\r\n\r\n");
|
||||
let sent_msg = t.send_msg(chat.id, &mut msg).await;
|
||||
let mut payload = sent_msg.payload().splitn(3, "\r\n\r\n");
|
||||
|
||||
let outer = payload.next().unwrap();
|
||||
let inner = payload.next().unwrap();
|
||||
let body = payload.next().unwrap();
|
||||
@@ -2034,8 +2035,8 @@ mod tests {
|
||||
|
||||
// if another message is sent, that one must not contain the avatar
|
||||
// and no artificial multipart/mixed nesting
|
||||
let payload = t.send_msg(chat.id, &mut msg).await.payload();
|
||||
let mut payload = payload.splitn(2, "\r\n\r\n");
|
||||
let sent_msg = t.send_msg(chat.id, &mut msg).await;
|
||||
let mut payload = sent_msg.payload().splitn(2, "\r\n\r\n");
|
||||
let outer = payload.next().unwrap();
|
||||
let body = payload.next().unwrap();
|
||||
|
||||
|
||||
@@ -110,9 +110,6 @@ pub enum Param {
|
||||
/// For Jobs
|
||||
AlsoMove = b'M',
|
||||
|
||||
/// For Jobs: space-separated list of message recipients
|
||||
Recipients = b'R',
|
||||
|
||||
/// For MDN-sending job
|
||||
MsgId = b'I',
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::context::Context;
|
||||
use crate::dc_tools::maybe_add_time_based_warnings;
|
||||
use crate::imap::Imap;
|
||||
use crate::job::{self, Thread};
|
||||
use crate::smtp::Smtp;
|
||||
use crate::smtp::{send_smtp_messages, Smtp};
|
||||
|
||||
use self::connectivity::ConnectivityStore;
|
||||
|
||||
@@ -273,17 +273,25 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
|
||||
|
||||
let mut interrupt_info = Default::default();
|
||||
loop {
|
||||
match job::load_next(&ctx, Thread::Smtp, &interrupt_info)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
{
|
||||
let job = match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
|
||||
Err(err) => {
|
||||
error!(ctx, "Failed loading job from the database: {:#}.", err);
|
||||
None
|
||||
}
|
||||
Ok(job) => job,
|
||||
};
|
||||
|
||||
match job {
|
||||
Some(job) => {
|
||||
info!(ctx, "executing smtp job");
|
||||
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
|
||||
interrupt_info = Default::default();
|
||||
}
|
||||
None => {
|
||||
if let Err(err) = send_smtp_messages(&ctx, &mut connection).await {
|
||||
warn!(ctx, "send_smtp_messages failed: {:#}", err);
|
||||
}
|
||||
|
||||
// Fake Idle
|
||||
info!(ctx, "smtp fake idle - started");
|
||||
match &connection.last_send_error {
|
||||
|
||||
235
src/smtp.rs
235
src/smtp.rs
@@ -4,14 +4,18 @@ pub mod send;
|
||||
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use anyhow::{format_err, Context as _};
|
||||
use async_smtp::smtp::client::net::ClientTlsParameters;
|
||||
use async_smtp::smtp::response::{Category, Code, Detail};
|
||||
use async_smtp::{error, smtp, EmailAddress, ServerAddress};
|
||||
|
||||
use crate::constants::DC_LP_AUTH_OAUTH2;
|
||||
use crate::events::EventType;
|
||||
use crate::job::Status;
|
||||
use crate::login_param::{
|
||||
dc_build_tls, CertificateChecks, LoginParam, ServerLoginParam, Socks5Config,
|
||||
};
|
||||
use crate::message::{self, MsgId};
|
||||
use crate::oauth2::dc_get_oauth2_access_token;
|
||||
use crate::provider::Socket;
|
||||
use crate::{context::Context, scheduler::connectivity::ConnectivityStore};
|
||||
@@ -220,3 +224,234 @@ impl Smtp {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn smtp_send(
|
||||
context: &Context,
|
||||
recipients: Vec<async_smtp::EmailAddress>,
|
||||
message: String,
|
||||
smtp: &mut Smtp,
|
||||
msg_id: MsgId,
|
||||
rowid: i64,
|
||||
) -> Status {
|
||||
if std::env::var(crate::DCC_MIME_DEBUG).is_ok() {
|
||||
info!(context, "smtp-sending out mime message:");
|
||||
println!("{}", message);
|
||||
}
|
||||
|
||||
smtp.connectivity.set_working(context).await;
|
||||
|
||||
let send_result = smtp
|
||||
.send(context, recipients, message.into_bytes(), rowid)
|
||||
.await;
|
||||
smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string());
|
||||
|
||||
let status = match send_result {
|
||||
Err(crate::smtp::send::Error::SmtpSend(err)) => {
|
||||
// Remote error, retry later.
|
||||
warn!(context, "SMTP failed to send: {:?}", &err);
|
||||
|
||||
let res = match err {
|
||||
async_smtp::smtp::error::Error::Permanent(ref response) => {
|
||||
// Workaround for incorrectly configured servers returning permanent errors
|
||||
// instead of temporary ones.
|
||||
let maybe_transient = match response.code {
|
||||
// Sometimes servers send a permanent error when actually it is a temporary error
|
||||
// For documentation see <https://tools.ietf.org/html/rfc3463>
|
||||
Code {
|
||||
category: Category::MailSystem,
|
||||
detail: Detail::Zero,
|
||||
..
|
||||
} => {
|
||||
// Ignore status code 5.5.0, see <https://support.delta.chat/t/every-other-message-gets-stuck/877/2>
|
||||
// Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns
|
||||
// "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later".
|
||||
//
|
||||
// Other enhanced status codes, such as Postfix
|
||||
// "550 5.1.1 <foobar@example.org>: Recipient address rejected: User unknown in local recipient table"
|
||||
// are not ignored.
|
||||
response.first_word() == Some(&"5.5.0".to_string())
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if maybe_transient {
|
||||
Status::RetryLater
|
||||
} else {
|
||||
// If we do not retry, add an info message to the chat.
|
||||
// Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..."
|
||||
// should definitely go here, because user has to open the link to
|
||||
// resume message sending.
|
||||
Status::Finished(Err(format_err!("Permanent SMTP error: {}", err)))
|
||||
}
|
||||
}
|
||||
async_smtp::smtp::error::Error::Transient(ref response) => {
|
||||
// We got a transient 4xx response from SMTP server.
|
||||
// Give some time until the server-side error maybe goes away.
|
||||
|
||||
if let Some(first_word) = response.first_word() {
|
||||
if first_word.ends_with(".1.1")
|
||||
|| first_word.ends_with(".1.2")
|
||||
|| first_word.ends_with(".1.3")
|
||||
{
|
||||
// Sometimes we receive transient errors that should be permanent.
|
||||
// Any extended smtp status codes like x.1.1, x.1.2 or x.1.3 that we
|
||||
// receive as a transient error are misconfigurations of the smtp server.
|
||||
// See <https://tools.ietf.org/html/rfc3463#section-3.2>
|
||||
info!(context, "Received extended status code {} for a transient error. This looks like a misconfigured smtp server, let's fail immediatly", first_word);
|
||||
Status::Finished(Err(format_err!("Permanent SMTP error: {}", err)))
|
||||
} else {
|
||||
Status::RetryLater
|
||||
}
|
||||
} else {
|
||||
Status::RetryLater
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if smtp.has_maybe_stale_connection().await {
|
||||
info!(context, "stale connection? immediately reconnecting");
|
||||
Status::RetryNow
|
||||
} else {
|
||||
Status::RetryLater
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// this clears last_success info
|
||||
smtp.disconnect().await;
|
||||
|
||||
res
|
||||
}
|
||||
Err(crate::smtp::send::Error::Envelope(err)) => {
|
||||
// Local error, job is invalid, do not retry.
|
||||
smtp.disconnect().await;
|
||||
warn!(context, "SMTP job is invalid: {}", err);
|
||||
Status::Finished(Err(err.into()))
|
||||
}
|
||||
Err(crate::smtp::send::Error::NoTransport) => {
|
||||
// Should never happen.
|
||||
// It does not even make sense to disconnect here.
|
||||
error!(context, "SMTP job failed because SMTP has no transport");
|
||||
Status::Finished(Err(format_err!("SMTP has not transport")))
|
||||
}
|
||||
Err(crate::smtp::send::Error::Other(err)) => {
|
||||
// Local error, job is invalid, do not retry.
|
||||
smtp.disconnect().await;
|
||||
warn!(context, "unable to load job: {}", err);
|
||||
Status::Finished(Err(err))
|
||||
}
|
||||
Ok(()) => Status::Finished(Ok(())),
|
||||
};
|
||||
|
||||
if let Status::Finished(Err(err)) = &status {
|
||||
// We couldn't send the message, so mark it as failed
|
||||
message::set_msg_failed(context, msg_id, Some(err.to_string())).await;
|
||||
}
|
||||
status
|
||||
}
|
||||
|
||||
/// Sends message identified by `smtp` table rowid over SMTP connection.
|
||||
///
|
||||
/// Removes row if the message should not be retried, otherwise increments retry count.
|
||||
pub(crate) async fn send_msg_to_smtp(
|
||||
context: &Context,
|
||||
smtp: &mut Smtp,
|
||||
rowid: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
if let Err(err) = smtp
|
||||
.connect_configured(context)
|
||||
.await
|
||||
.context("SMTP connection failure")
|
||||
{
|
||||
smtp.last_send_error = Some(format!("SMTP connection failure: {:#}", err));
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let (body, recipients, msg_id) = context
|
||||
.sql
|
||||
.query_row(
|
||||
"SELECT mime, recipients, msg_id FROM smtp WHERE id=?",
|
||||
paramsv![rowid],
|
||||
|row| {
|
||||
let mime: String = row.get(0)?;
|
||||
let recipients: String = row.get(1)?;
|
||||
let msg_id: MsgId = row.get(2)?;
|
||||
Ok((mime, recipients, msg_id))
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let recipients_list = recipients
|
||||
.split(' ')
|
||||
.filter_map(
|
||||
|addr| match async_smtp::EmailAddress::new(addr.to_string()) {
|
||||
Ok(addr) => Some(addr),
|
||||
Err(err) => {
|
||||
warn!(context, "invalid recipient: {} {:?}", addr, err);
|
||||
None
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let status = smtp_send(context, recipients_list, body, smtp, msg_id, rowid).await;
|
||||
match status {
|
||||
Status::Finished(res) => {
|
||||
if res.is_ok() {
|
||||
msg_id.set_delivered(context).await?;
|
||||
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp WHERE id=?", paramsv![rowid])
|
||||
.await?;
|
||||
}
|
||||
res
|
||||
}
|
||||
Status::RetryNow | Status::RetryLater => {
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE smtp SET retries=retries+1 WHERE id=?",
|
||||
paramsv![rowid],
|
||||
)
|
||||
.await
|
||||
.context("failed to update retries count")?;
|
||||
Err(format_err!("Retry"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to send all messages currently in `smtp` table.
|
||||
///
|
||||
/// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent
|
||||
/// does not block other messages in the queue from being sent.
|
||||
pub(crate) async fn send_smtp_messages(
|
||||
context: &Context,
|
||||
connection: &mut Smtp,
|
||||
) -> anyhow::Result<()> {
|
||||
context.send_sync_msg().await?; // Add sync message to the end of the queue if needed.
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp WHERE retries > 5", paramsv![])
|
||||
.await?;
|
||||
let rowids = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT id FROM smtp ORDER BY id ASC",
|
||||
paramsv![],
|
||||
|row| {
|
||||
let rowid: i64 = row.get(0)?;
|
||||
Ok(rowid)
|
||||
},
|
||||
|rowids| {
|
||||
rowids
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(Into::into)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
for rowid in rowids {
|
||||
if let Err(err) = send_msg_to_smtp(context, connection, rowid).await {
|
||||
info!(context, "Failed to send message over SMTP: {:#}.", err);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ impl Smtp {
|
||||
context: &Context,
|
||||
recipients: Vec<EmailAddress>,
|
||||
message: Vec<u8>,
|
||||
job_id: u32,
|
||||
rowid: i64,
|
||||
) -> Result<()> {
|
||||
let message_len_bytes = message.len();
|
||||
|
||||
@@ -52,7 +52,7 @@ impl Smtp {
|
||||
.map_err(Error::Envelope)?;
|
||||
let mail = SendableEmail::new(
|
||||
envelope,
|
||||
format!("{}", job_id), // only used for internal logging
|
||||
rowid.to_string(), // only used for internal logging
|
||||
&message,
|
||||
);
|
||||
|
||||
|
||||
@@ -271,10 +271,10 @@ impl Sql {
|
||||
&self,
|
||||
query: impl AsRef<str>,
|
||||
params: impl rusqlite::Params,
|
||||
) -> anyhow::Result<usize> {
|
||||
) -> Result<i64> {
|
||||
let conn = self.get_conn().await?;
|
||||
conn.execute(query.as_ref(), params)?;
|
||||
Ok(usize::try_from(conn.last_insert_rowid())?)
|
||||
Ok(conn.last_insert_rowid())
|
||||
}
|
||||
|
||||
/// Prepares and executes the statement and maps a function over the resulting rows.
|
||||
|
||||
@@ -562,6 +562,23 @@ CREATE INDEX msgs_status_updates_index1 ON msgs_status_updates (msg_id);"#,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if dbversion < 85 {
|
||||
info!(context, "[migration] v85");
|
||||
sql.execute_migration(
|
||||
r#"CREATE TABLE smtp (
|
||||
id INTEGER PRIMARY KEY,
|
||||
rfc724_mid TEXT NOT NULL, -- Message-ID
|
||||
mime TEXT NOT NULL, -- SMTP payload
|
||||
msg_id INTEGER NOT NULL, -- ID of the message in `msgs` table
|
||||
recipients TEXT NOT NULL, -- List of recipients separated by space
|
||||
retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the messsage
|
||||
);
|
||||
CREATE INDEX smtp_messageid ON imap(rfc724_mid);
|
||||
"#,
|
||||
85,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok((
|
||||
recalc_fingerprints,
|
||||
|
||||
@@ -5,13 +5,11 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Deref;
|
||||
use std::panic;
|
||||
use std::str::FromStr;
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use ansi_term::Color;
|
||||
use async_std::channel::{self, Receiver, Sender};
|
||||
use async_std::path::PathBuf;
|
||||
use async_std::prelude::*;
|
||||
use async_std::sync::{Arc, RwLock};
|
||||
use async_std::task;
|
||||
@@ -30,11 +28,9 @@ use crate::context::Context;
|
||||
use crate::dc_receive_imf::dc_receive_imf;
|
||||
use crate::dc_tools::EmailAddress;
|
||||
use crate::events::{Event, EventType};
|
||||
use crate::job::Action;
|
||||
use crate::key::{self, DcKey, KeyPair, KeyPairUse};
|
||||
use crate::message::{update_msg_state, Message, MessageState, MsgId};
|
||||
use crate::mimeparser::MimeMessage;
|
||||
use crate::param::{Param, Params};
|
||||
|
||||
#[allow(non_upper_case_globals)]
|
||||
pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png");
|
||||
@@ -275,27 +271,27 @@ impl TestContext {
|
||||
/// Panics if there is no message or on any error.
|
||||
pub async fn pop_sent_msg(&self) -> SentMessage {
|
||||
let start = Instant::now();
|
||||
let (rowid, foreign_id, raw_params) = loop {
|
||||
let (rowid, msg_id, payload, recipients) = loop {
|
||||
let row = self
|
||||
.ctx
|
||||
.sql
|
||||
.query_row(
|
||||
.query_row_optional(
|
||||
r#"
|
||||
SELECT id, foreign_id, param
|
||||
FROM jobs
|
||||
WHERE action=?
|
||||
ORDER BY desired_timestamp DESC;
|
||||
"#,
|
||||
paramsv![Action::SendMsgToSmtp],
|
||||
SELECT id, msg_id, mime, recipients
|
||||
FROM smtp
|
||||
ORDER BY id DESC"#,
|
||||
paramsv![],
|
||||
|row| {
|
||||
let id: u32 = row.get(0)?;
|
||||
let foreign_id: u32 = row.get(1)?;
|
||||
let param: String = row.get(2)?;
|
||||
Ok((id, foreign_id, param))
|
||||
let rowid: i64 = row.get(0)?;
|
||||
let msg_id: MsgId = row.get(1)?;
|
||||
let mime: String = row.get(2)?;
|
||||
let recipients: String = row.get(3)?;
|
||||
Ok((rowid, msg_id, mime, recipients))
|
||||
},
|
||||
)
|
||||
.await;
|
||||
if let Ok(row) = row {
|
||||
.await
|
||||
.expect("query_row_optional failed");
|
||||
if let Some(row) = row {
|
||||
break row;
|
||||
}
|
||||
if start.elapsed() < Duration::from_secs(3) {
|
||||
@@ -304,26 +300,18 @@ impl TestContext {
|
||||
panic!("no sent message found in jobs table");
|
||||
}
|
||||
};
|
||||
let id = MsgId::new(foreign_id);
|
||||
let params = Params::from_str(&raw_params).unwrap();
|
||||
let blob_path = params
|
||||
.get_blob(Param::File, &self.ctx, false)
|
||||
.await
|
||||
.expect("failed to parse blob from param")
|
||||
.expect("no Param::File found in Params")
|
||||
.to_abs_path();
|
||||
self.ctx
|
||||
.sql
|
||||
.execute("DELETE FROM jobs WHERE id=?;", paramsv![rowid])
|
||||
.await
|
||||
.expect("failed to remove job");
|
||||
update_msg_state(&self.ctx, id, MessageState::OutDelivered)
|
||||
update_msg_state(&self.ctx, msg_id, MessageState::OutDelivered)
|
||||
.await
|
||||
.expect("failed to update message state");
|
||||
SentMessage {
|
||||
params,
|
||||
blob_path,
|
||||
sender_msg_id: id,
|
||||
payload,
|
||||
sender_msg_id: msg_id,
|
||||
recipients,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -347,7 +335,7 @@ impl TestContext {
|
||||
let received_msg =
|
||||
"Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n"
|
||||
.to_owned()
|
||||
+ &msg.payload();
|
||||
+ msg.payload();
|
||||
dc_receive_imf(&self.ctx, received_msg.as_bytes(), "INBOX", false)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -592,8 +580,8 @@ impl Drop for LogSink {
|
||||
/// passed through a SMTP-IMAP pipeline.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SentMessage {
|
||||
params: Params,
|
||||
blob_path: PathBuf,
|
||||
payload: String,
|
||||
recipients: String,
|
||||
pub sender_msg_id: MsgId,
|
||||
}
|
||||
|
||||
@@ -602,17 +590,17 @@ impl SentMessage {
|
||||
///
|
||||
/// If there are multiple recipients this is just a random one, so is not very useful.
|
||||
pub fn recipient(&self) -> EmailAddress {
|
||||
let raw = self
|
||||
.params
|
||||
.get(Param::Recipients)
|
||||
.expect("no recipients in params");
|
||||
let rcpt = raw.split(' ').next().expect("no recipient found");
|
||||
let rcpt = self
|
||||
.recipients
|
||||
.split(' ')
|
||||
.next()
|
||||
.expect("no recipient found");
|
||||
rcpt.parse().expect("failed to parse email address")
|
||||
}
|
||||
|
||||
/// The raw message payload.
|
||||
pub fn payload(&self) -> String {
|
||||
std::fs::read_to_string(&self.blob_path).unwrap()
|
||||
pub fn payload(&self) -> &str {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user