mirror of
https://github.com/chatmail/core.git
synced 2026-04-25 01:16:29 +03:00
Replace SendMdn job with smtp_mdns table
Unlike jobs which are executed before sending normal messages, MDNs from `smtp_mdns` table are sent after sending messages from `smtp` table. This way normal messages have higher priority than MDNs. There are no SMTP jobs anymore. All jobs are IMAP jobs, so `jobs.thread` column is not used anymore.
This commit is contained in:
@@ -1,5 +1,11 @@
|
||||
# Changelog
|
||||
|
||||
## Unreleased
|
||||
|
||||
### Changes
|
||||
- send normal messages with higher priority than MDNs #3243
|
||||
|
||||
|
||||
## 1.80.0
|
||||
|
||||
### Changes
|
||||
|
||||
283
src/job.rs
283
src/job.rs
@@ -4,36 +4,22 @@
|
||||
//! and job types.
|
||||
use std::fmt;
|
||||
|
||||
use anyhow::{format_err, Context as _, Result};
|
||||
use anyhow::{Context as _, Result};
|
||||
use deltachat_derive::{FromSql, ToSql};
|
||||
use rand::{thread_rng, Rng};
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::contact::{normalize_name, Contact, ContactId, Modifier, Origin};
|
||||
use crate::contact::{normalize_name, Contact, Modifier, Origin};
|
||||
use crate::context::Context;
|
||||
use crate::dc_tools::time;
|
||||
use crate::events::EventType;
|
||||
use crate::imap::Imap;
|
||||
use crate::message::{Message, MsgId};
|
||||
use crate::mimefactory::MimeFactory;
|
||||
use crate::param::{Param, Params};
|
||||
use crate::param::Params;
|
||||
use crate::scheduler::InterruptInfo;
|
||||
use crate::smtp::{smtp_send, SendResult, Smtp};
|
||||
use crate::sql;
|
||||
|
||||
// results in ~3 weeks for the last backoff timespan
|
||||
const JOB_RETRIES: u32 = 17;
|
||||
|
||||
/// Thread IDs
|
||||
#[derive(
|
||||
Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql,
|
||||
)]
|
||||
#[repr(u32)]
|
||||
pub(crate) enum Thread {
|
||||
Imap = 100,
|
||||
Smtp = 5000,
|
||||
}
|
||||
|
||||
/// Job try result.
|
||||
#[derive(Debug, Display)]
|
||||
pub enum Status {
|
||||
@@ -72,7 +58,6 @@ macro_rules! job_try {
|
||||
)]
|
||||
#[repr(u32)]
|
||||
pub enum Action {
|
||||
// Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999
|
||||
FetchExistingMsgs = 110,
|
||||
|
||||
// this is user initiated so it should have a fairly high priority
|
||||
@@ -87,24 +72,6 @@ pub enum Action {
|
||||
// UID synchronization is high-priority to make sure correct UIDs
|
||||
// are used by message moving/deletion.
|
||||
ResyncFolders = 300,
|
||||
|
||||
// Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999
|
||||
SendMdn = 5010,
|
||||
}
|
||||
|
||||
impl From<Action> for Thread {
|
||||
fn from(action: Action) -> Thread {
|
||||
use Action::*;
|
||||
|
||||
match action {
|
||||
FetchExistingMsgs => Thread::Imap,
|
||||
ResyncFolders => Thread::Imap,
|
||||
UpdateRecentQuota => Thread::Imap,
|
||||
DownloadMsg => Thread::Imap,
|
||||
|
||||
SendMdn => Thread::Smtp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -159,9 +126,7 @@ impl Job {
|
||||
///
|
||||
/// The Job is consumed by this method.
|
||||
pub(crate) async fn save(self, context: &Context) -> Result<()> {
|
||||
let thread: Thread = self.action.into();
|
||||
|
||||
info!(context, "saving job for {}-thread: {:?}", thread, self);
|
||||
info!(context, "saving job {:?}", self);
|
||||
|
||||
if self.job_id != 0 {
|
||||
context
|
||||
@@ -178,10 +143,9 @@ impl Job {
|
||||
.await?;
|
||||
} else {
|
||||
context.sql.execute(
|
||||
"INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);",
|
||||
"INSERT INTO jobs (added_timestamp, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?);",
|
||||
paramsv![
|
||||
self.added_timestamp,
|
||||
thread,
|
||||
self.action,
|
||||
self.foreign_id,
|
||||
self.param.to_string(),
|
||||
@@ -193,118 +157,6 @@ impl Job {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job.
|
||||
async fn get_additional_mdn_jobs(
|
||||
&self,
|
||||
context: &Context,
|
||||
contact_id: ContactId,
|
||||
) -> Result<(Vec<u32>, Vec<String>)> {
|
||||
// Extract message IDs from job parameters
|
||||
let res: Vec<(u32, MsgId)> = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?",
|
||||
paramsv![contact_id, self.job_id],
|
||||
|row| {
|
||||
let job_id: u32 = row.get(0)?;
|
||||
let params_str: String = row.get(1)?;
|
||||
let params: Params = params_str.parse().unwrap_or_default();
|
||||
Ok((job_id, params))
|
||||
},
|
||||
|jobs| {
|
||||
let res = jobs
|
||||
.filter_map(|row| {
|
||||
let (job_id, params) = row.ok()?;
|
||||
let msg_id = params.get_msg_id()?;
|
||||
Some((job_id, msg_id))
|
||||
})
|
||||
.collect();
|
||||
Ok(res)
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Load corresponding RFC724 message IDs
|
||||
let mut job_ids = Vec::new();
|
||||
let mut rfc724_mids = Vec::new();
|
||||
for (job_id, msg_id) in res {
|
||||
if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id).await {
|
||||
job_ids.push(job_id);
|
||||
rfc724_mids.push(rfc724_mid);
|
||||
}
|
||||
}
|
||||
Ok((job_ids, rfc724_mids))
|
||||
}
|
||||
|
||||
async fn send_mdn(&mut self, context: &Context, smtp: &mut Smtp) -> Status {
|
||||
let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await);
|
||||
if !mdns_enabled {
|
||||
// User has disabled MDNs after job scheduling but before
|
||||
// execution.
|
||||
return Status::Finished(Err(format_err!("MDNs are disabled")));
|
||||
}
|
||||
|
||||
let contact_id = ContactId::new(self.foreign_id);
|
||||
let contact = job_try!(Contact::load_from_db(context, contact_id).await);
|
||||
if contact.is_blocked() {
|
||||
return Status::Finished(Err(format_err!("Contact is blocked")));
|
||||
}
|
||||
|
||||
let msg_id = if let Some(msg_id) = self.param.get_msg_id() {
|
||||
msg_id
|
||||
} else {
|
||||
return Status::Finished(Err(format_err!(
|
||||
"SendMdn job has invalid parameters: {}",
|
||||
self.param
|
||||
)));
|
||||
};
|
||||
|
||||
// Try to aggregate other SendMdn jobs and send a combined MDN.
|
||||
let (additional_job_ids, additional_rfc724_mids) = self
|
||||
.get_additional_mdn_jobs(context, contact_id)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
if !additional_rfc724_mids.is_empty() {
|
||||
info!(
|
||||
context,
|
||||
"SendMdn job: aggregating {} additional MDNs",
|
||||
additional_rfc724_mids.len()
|
||||
)
|
||||
}
|
||||
|
||||
let msg = job_try!(Message::load_from_db(context, msg_id).await);
|
||||
let mimefactory =
|
||||
job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await);
|
||||
let rendered_msg = job_try!(mimefactory.render(context).await);
|
||||
let body = rendered_msg.message;
|
||||
|
||||
let addr = contact.get_addr();
|
||||
let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string())
|
||||
.map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err)));
|
||||
let recipients = vec![recipient];
|
||||
|
||||
// connect to SMTP server, if not yet done
|
||||
if let Err(err) = smtp.connect_configured(context).await {
|
||||
warn!(context, "SMTP connection failure: {:?}", err);
|
||||
smtp.last_send_error = Some(err.to_string());
|
||||
return Status::RetryLater;
|
||||
}
|
||||
|
||||
match smtp_send(context, &recipients, &body, smtp, msg_id, 0).await {
|
||||
SendResult::Success => {
|
||||
// Remove additional SendMdn jobs we have aggregated into this one.
|
||||
job_try!(kill_ids(context, &additional_job_ids).await);
|
||||
Status::Finished(Ok(()))
|
||||
}
|
||||
SendResult::Retry => {
|
||||
info!(context, "Temporary SMTP failure while sending an MDN");
|
||||
Status::RetryLater
|
||||
}
|
||||
SendResult::Failure(err) => Status::Finished(Err(err)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Read the recipients from old emails sent by the user and add them as contacts.
|
||||
/// This way, we can already offer them some email addresses they can write to.
|
||||
///
|
||||
@@ -387,22 +239,6 @@ pub async fn kill_action(context: &Context, action: Action) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove jobs with specified IDs.
|
||||
async fn kill_ids(context: &Context, job_ids: &[u32]) -> Result<()> {
|
||||
if job_ids.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let q = format!(
|
||||
"DELETE FROM jobs WHERE id IN({})",
|
||||
sql::repeat_vars(job_ids.len())
|
||||
);
|
||||
context
|
||||
.sql
|
||||
.execute(q, rusqlite::params_from_iter(job_ids))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn action_exists(context: &Context, action: Action) -> Result<bool> {
|
||||
let exists = context
|
||||
.sql
|
||||
@@ -461,36 +297,18 @@ async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, fold
|
||||
|
||||
pub(crate) enum Connection<'a> {
|
||||
Inbox(&'a mut Imap),
|
||||
Smtp(&'a mut Smtp),
|
||||
}
|
||||
|
||||
impl<'a> fmt::Display for Connection<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Connection::Inbox(_) => write!(f, "Inbox"),
|
||||
Connection::Smtp(_) => write!(f, "Smtp"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Connection<'a> {
|
||||
fn inbox(&mut self) -> &mut Imap {
|
||||
match self {
|
||||
Connection::Inbox(imap) => imap,
|
||||
_ => panic!("Not an inbox"),
|
||||
}
|
||||
}
|
||||
|
||||
fn smtp(&mut self) -> &mut Smtp {
|
||||
match self {
|
||||
Connection::Smtp(smtp) => smtp,
|
||||
_ => panic!("Not a smtp"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_>, mut job: Job) {
|
||||
info!(context, "{}-job {} started...", &connection, &job);
|
||||
info!(context, "job {} started...", &job);
|
||||
|
||||
let try_res = match perform_job_action(context, &mut job, &mut connection, 0).await {
|
||||
Status::RetryNow => perform_job_action(context, &mut job, &mut connection, 1).await,
|
||||
@@ -502,17 +320,13 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
|
||||
let tries = job.tries + 1;
|
||||
|
||||
if tries < JOB_RETRIES {
|
||||
info!(
|
||||
context,
|
||||
"{} thread increases job {} tries to {}", &connection, job, tries
|
||||
);
|
||||
info!(context, "increase job {} tries to {}", job, tries);
|
||||
job.tries = tries;
|
||||
let time_offset = get_backoff_time_offset(tries, job.action);
|
||||
job.desired_timestamp = time() + time_offset;
|
||||
info!(
|
||||
context,
|
||||
"{}-job #{} not succeeded on try #{}, retry in {} seconds.",
|
||||
&connection,
|
||||
"job #{} not succeeded on try #{}, retry in {} seconds.",
|
||||
job.job_id as u32,
|
||||
tries,
|
||||
time_offset
|
||||
@@ -523,10 +337,7 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
|
||||
} else {
|
||||
info!(
|
||||
context,
|
||||
"{} thread removes job {} as it exhausted {} retries",
|
||||
&connection,
|
||||
job,
|
||||
JOB_RETRIES
|
||||
"remove job {} as it exhausted {} retries", job, JOB_RETRIES
|
||||
);
|
||||
job.delete(context).await.unwrap_or_else(|err| {
|
||||
error!(context, "failed to delete job: {}", err);
|
||||
@@ -537,13 +348,10 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_
|
||||
if let Err(err) = res {
|
||||
warn!(
|
||||
context,
|
||||
"{} removes job {} as it failed with error {:#}", &connection, job, err
|
||||
"remove job {} as it failed with error {:#}", job, err
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
context,
|
||||
"{} removes job {} as it succeeded", &connection, job
|
||||
);
|
||||
info!(context, "remove job {} as it succeeded", job);
|
||||
}
|
||||
|
||||
job.delete(context).await.unwrap_or_else(|err| {
|
||||
@@ -559,13 +367,9 @@ async fn perform_job_action(
|
||||
connection: &mut Connection<'_>,
|
||||
tries: u32,
|
||||
) -> Status {
|
||||
info!(
|
||||
context,
|
||||
"{} begin immediate try {} of job {}", &connection, tries, job
|
||||
);
|
||||
info!(context, "begin immediate try {} of job {}", tries, job);
|
||||
|
||||
let try_res = match job.action {
|
||||
Action::SendMdn => job.send_mdn(context, connection.smtp()).await,
|
||||
Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await,
|
||||
Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await,
|
||||
Action::UpdateRecentQuota => match context.update_recent_quota(connection.inbox()).await {
|
||||
@@ -600,19 +404,6 @@ fn get_backoff_time_offset(tries: u32, action: Action) -> i64 {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_mdn(context: &Context, msg_id: MsgId, from_id: ContactId) -> Result<()> {
|
||||
let mut param = Params::new();
|
||||
param.set(Param::MsgId, msg_id.to_u32().to_string());
|
||||
|
||||
add(
|
||||
context,
|
||||
Job::new(Action::SendMdn, from_id.to_u32(), param, 0),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn schedule_resync(context: &Context) -> Result<()> {
|
||||
kill_action(context, Action::ResyncFolders).await?;
|
||||
add(
|
||||
@@ -638,10 +429,6 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
|
||||
info!(context, "interrupt: imap");
|
||||
context.interrupt_inbox(InterruptInfo::new(false)).await;
|
||||
}
|
||||
Action::SendMdn => {
|
||||
info!(context, "interrupt: smtp");
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -649,21 +436,15 @@ pub async fn add(context: &Context, job: Job) -> Result<()> {
|
||||
|
||||
/// Load jobs from the database.
|
||||
///
|
||||
/// Load jobs for this "[Thread]", i.e. either load SMTP jobs or load
|
||||
/// IMAP jobs. The `probe_network` parameter decides how to query
|
||||
/// The `probe_network` parameter decides how to query
|
||||
/// jobs, this is tricky and probably wrong currently. Look at the
|
||||
/// SQL queries for details.
|
||||
pub(crate) async fn load_next(
|
||||
context: &Context,
|
||||
thread: Thread,
|
||||
info: &InterruptInfo,
|
||||
) -> Result<Option<Job>> {
|
||||
info!(context, "loading job for {}-thread", thread);
|
||||
pub(crate) async fn load_next(context: &Context, info: &InterruptInfo) -> Result<Option<Job>> {
|
||||
info!(context, "loading job");
|
||||
|
||||
let query;
|
||||
let params;
|
||||
let t = time();
|
||||
let thread_i = thread as i64;
|
||||
|
||||
if !info.probe_network {
|
||||
// processing for first-try and after backoff-timeouts:
|
||||
@@ -671,11 +452,11 @@ pub(crate) async fn load_next(
|
||||
query = r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND desired_timestamp<=?
|
||||
WHERE desired_timestamp<=?
|
||||
ORDER BY action DESC, added_timestamp
|
||||
LIMIT 1;
|
||||
"#;
|
||||
params = paramsv![thread_i, t];
|
||||
params = paramsv![t];
|
||||
} else {
|
||||
// processing after call to dc_maybe_network():
|
||||
// process _all_ pending jobs that failed before
|
||||
@@ -683,11 +464,11 @@ LIMIT 1;
|
||||
query = r#"
|
||||
SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries
|
||||
FROM jobs
|
||||
WHERE thread=? AND tries>0
|
||||
WHERE tries>0
|
||||
ORDER BY desired_timestamp, action DESC
|
||||
LIMIT 1;
|
||||
"#;
|
||||
params = paramsv![thread_i];
|
||||
params = paramsv![];
|
||||
};
|
||||
|
||||
loop {
|
||||
@@ -742,11 +523,10 @@ mod tests {
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO jobs
|
||||
(added_timestamp, thread, action, foreign_id, param, desired_timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?);",
|
||||
(added_timestamp, action, foreign_id, param, desired_timestamp)
|
||||
VALUES (?, ?, ?, ?, ?);",
|
||||
paramsv![
|
||||
now,
|
||||
Thread::from(Action::DownloadMsg),
|
||||
if valid {
|
||||
Action::DownloadMsg as i32
|
||||
} else {
|
||||
@@ -768,21 +548,11 @@ mod tests {
|
||||
// all jobs.
|
||||
let t = TestContext::new().await;
|
||||
insert_job(&t, 1, false).await; // This can not be loaded into Job struct.
|
||||
let jobs = load_next(
|
||||
&t,
|
||||
Thread::from(Action::DownloadMsg),
|
||||
&InterruptInfo::new(false),
|
||||
)
|
||||
.await?;
|
||||
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
|
||||
assert!(jobs.is_none());
|
||||
|
||||
insert_job(&t, 1, true).await;
|
||||
let jobs = load_next(
|
||||
&t,
|
||||
Thread::from(Action::DownloadMsg),
|
||||
&InterruptInfo::new(false),
|
||||
)
|
||||
.await?;
|
||||
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
|
||||
assert!(jobs.is_some());
|
||||
Ok(())
|
||||
}
|
||||
@@ -793,12 +563,7 @@ mod tests {
|
||||
|
||||
insert_job(&t, 1, true).await;
|
||||
|
||||
let jobs = load_next(
|
||||
&t,
|
||||
Thread::from(Action::DownloadMsg),
|
||||
&InterruptInfo::new(false),
|
||||
)
|
||||
.await?;
|
||||
let jobs = load_next(&t, &InterruptInfo::new(false)).await?;
|
||||
assert!(jobs.is_some());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ use crate::download::DownloadState;
|
||||
use crate::ephemeral::{start_ephemeral_timers_msgids, Timer as EphemeralTimer};
|
||||
use crate::events::EventType;
|
||||
use crate::imap::markseen_on_imap_table;
|
||||
use crate::job;
|
||||
use crate::log::LogExt;
|
||||
use crate::mimeparser::{parse_message_id, FailureReport, SystemMessage};
|
||||
use crate::param::{Param, Params};
|
||||
@@ -1364,9 +1363,15 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec<MsgId>) -> Result<()>
|
||||
{
|
||||
let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?;
|
||||
if mdns_enabled {
|
||||
if let Err(err) = job::send_mdn(context, id, curr_from_id).await {
|
||||
warn!(context, "could not send out mdn for {}: {}", id, err);
|
||||
}
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"INSERT INTO smtp_mdns (msg_id, from_id, rfc724_mid) VALUES(?, ?, ?)",
|
||||
paramsv![id, curr_from_id, curr_rfc724_mid],
|
||||
)
|
||||
.await
|
||||
.context("failed to insert into smtp_mdns")?;
|
||||
context.interrupt_smtp(InterruptInfo::new(false)).await;
|
||||
}
|
||||
}
|
||||
updated_chat_ids.insert(curr_chat_id);
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::dc_tools::maybe_add_time_based_warnings;
|
||||
use crate::dc_tools::time;
|
||||
use crate::ephemeral::{self, delete_expired_imap_messages};
|
||||
use crate::imap::Imap;
|
||||
use crate::job::{self, Thread};
|
||||
use crate::job;
|
||||
use crate::location;
|
||||
use crate::log::LogExt;
|
||||
use crate::smtp::{send_smtp_messages, Smtp};
|
||||
@@ -93,7 +93,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne
|
||||
|
||||
let mut info = InterruptInfo::default();
|
||||
loop {
|
||||
let job = match job::load_next(&ctx, Thread::Imap, &info).await {
|
||||
let job = match job::load_next(&ctx, &info).await {
|
||||
Err(err) => {
|
||||
error!(ctx, "Failed loading job from the database: {:#}.", err);
|
||||
None
|
||||
@@ -303,65 +303,46 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect
|
||||
}
|
||||
|
||||
let mut timeout = None;
|
||||
let mut interrupt_info = Default::default();
|
||||
loop {
|
||||
let job = match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await {
|
||||
Err(err) => {
|
||||
error!(ctx, "Failed loading job from the database: {:#}.", err);
|
||||
None
|
||||
}
|
||||
Ok(job) => job,
|
||||
let res = send_smtp_messages(&ctx, &mut connection).await;
|
||||
if let Err(err) = &res {
|
||||
warn!(ctx, "send_smtp_messages failed: {:#}", err);
|
||||
}
|
||||
let success = res.unwrap_or(false);
|
||||
timeout = if success {
|
||||
None
|
||||
} else {
|
||||
Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))
|
||||
};
|
||||
|
||||
match job {
|
||||
Some(job) => {
|
||||
info!(ctx, "executing smtp job");
|
||||
job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await;
|
||||
interrupt_info = Default::default();
|
||||
}
|
||||
None => {
|
||||
let res = send_smtp_messages(&ctx, &mut connection).await;
|
||||
if let Err(err) = &res {
|
||||
warn!(ctx, "send_smtp_messages failed: {:#}", err);
|
||||
}
|
||||
let success = res.unwrap_or(false);
|
||||
timeout = if success {
|
||||
None
|
||||
} else {
|
||||
Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3)))
|
||||
};
|
||||
|
||||
// Fake Idle
|
||||
info!(ctx, "smtp fake idle - started");
|
||||
match &connection.last_send_error {
|
||||
None => connection.connectivity.set_connected(&ctx).await,
|
||||
Some(err) => connection.connectivity.set_err(&ctx, err).await,
|
||||
}
|
||||
|
||||
// If send_smtp_messages() failed, we set a timeout for the fake-idle so that
|
||||
// sending is retried (at the latest) after the timeout. If sending fails
|
||||
// again, we increase the timeout exponentially, in order not to do lots of
|
||||
// unnecessary retries.
|
||||
if let Some(timeout) = timeout {
|
||||
info!(
|
||||
ctx,
|
||||
"smtp has messages to retry, planning to retry {} seconds later",
|
||||
timeout
|
||||
);
|
||||
let duration = std::time::Duration::from_secs(timeout);
|
||||
interrupt_info = async_std::future::timeout(duration, async {
|
||||
idle_interrupt_receiver.recv().await.unwrap_or_default()
|
||||
})
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
} else {
|
||||
info!(ctx, "smtp has no messages to retry, waiting for interrupt");
|
||||
interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default();
|
||||
};
|
||||
|
||||
info!(ctx, "smtp fake idle - interrupted")
|
||||
}
|
||||
// Fake Idle
|
||||
info!(ctx, "smtp fake idle - started");
|
||||
match &connection.last_send_error {
|
||||
None => connection.connectivity.set_connected(&ctx).await,
|
||||
Some(err) => connection.connectivity.set_err(&ctx, err).await,
|
||||
}
|
||||
|
||||
// If send_smtp_messages() failed, we set a timeout for the fake-idle so that
|
||||
// sending is retried (at the latest) after the timeout. If sending fails
|
||||
// again, we increase the timeout exponentially, in order not to do lots of
|
||||
// unnecessary retries.
|
||||
if let Some(timeout) = timeout {
|
||||
info!(
|
||||
ctx,
|
||||
"smtp has messages to retry, planning to retry {} seconds later", timeout
|
||||
);
|
||||
let duration = std::time::Duration::from_secs(timeout);
|
||||
async_std::future::timeout(duration, async {
|
||||
idle_interrupt_receiver.recv().await.unwrap_or_default()
|
||||
})
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
} else {
|
||||
info!(ctx, "smtp has no messages to retry, waiting for interrupt");
|
||||
idle_interrupt_receiver.recv().await.unwrap_or_default();
|
||||
};
|
||||
|
||||
info!(ctx, "smtp fake idle - interrupted")
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
176
src/smtp.rs
176
src/smtp.rs
@@ -10,14 +10,19 @@ use async_smtp::smtp::response::{Category, Code, Detail};
|
||||
use async_smtp::{smtp, EmailAddress, ServerAddress};
|
||||
use async_std::task;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::constants::DC_LP_AUTH_OAUTH2;
|
||||
use crate::contact::{Contact, ContactId};
|
||||
use crate::events::EventType;
|
||||
use crate::login_param::{
|
||||
dc_build_tls, CertificateChecks, LoginParam, ServerLoginParam, Socks5Config,
|
||||
};
|
||||
use crate::message::Message;
|
||||
use crate::message::{self, MsgId};
|
||||
use crate::mimefactory::MimeFactory;
|
||||
use crate::oauth2::dc_get_oauth2_access_token;
|
||||
use crate::provider::Socket;
|
||||
use crate::sql;
|
||||
use crate::{context::Context, scheduler::connectivity::ConnectivityStore};
|
||||
|
||||
/// SMTP write and read timeout in seconds.
|
||||
@@ -82,6 +87,11 @@ impl Smtp {
|
||||
|
||||
/// Connect using configured parameters.
|
||||
pub async fn connect_configured(&mut self, context: &Context) -> Result<()> {
|
||||
if self.has_maybe_stale_connection().await {
|
||||
info!(context, "Closing stale connection");
|
||||
self.disconnect().await;
|
||||
}
|
||||
|
||||
if self.is_connected().await {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -226,18 +236,13 @@ pub(crate) async fn smtp_send(
|
||||
|
||||
smtp.connectivity.set_working(context).await;
|
||||
|
||||
if smtp.has_maybe_stale_connection().await {
|
||||
info!(context, "Closing stale connection");
|
||||
smtp.disconnect().await;
|
||||
|
||||
if let Err(err) = smtp
|
||||
.connect_configured(context)
|
||||
.await
|
||||
.context("failed to reopen stale SMTP connection")
|
||||
{
|
||||
smtp.last_send_error = Some(format!("{:#}", err));
|
||||
return SendResult::Retry;
|
||||
}
|
||||
if let Err(err) = smtp
|
||||
.connect_configured(context)
|
||||
.await
|
||||
.context("Failed to open SMTP connection")
|
||||
{
|
||||
smtp.last_send_error = Some(format!("{:#}", err));
|
||||
return SendResult::Retry;
|
||||
}
|
||||
|
||||
let send_result = smtp
|
||||
@@ -479,7 +484,7 @@ pub(crate) async fn send_msg_to_smtp(
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to send all messages currently in `smtp` table.
|
||||
/// Tries to send all messages currently in `smtp` and `smtp_mdns` tables.
|
||||
///
|
||||
/// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent
|
||||
/// does not block other messages in the queue from being sent.
|
||||
@@ -510,5 +515,150 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp)
|
||||
success = false;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
match send_mdn(context, connection).await {
|
||||
Err(err) => {
|
||||
info!(context, "Failed to send MDNs over SMTP: {:#}.", err);
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
Ok(false) => {
|
||||
break;
|
||||
}
|
||||
Ok(true) => {}
|
||||
}
|
||||
}
|
||||
Ok(success)
|
||||
}
|
||||
|
||||
/// Tries to send MDN for message `msg_id` to `contact_id`.
|
||||
///
|
||||
/// Attempts to aggregate additional MDNs for `contact_id` into sent MDN.
|
||||
///
|
||||
/// On failure returns an error without removing any `smtp_mdns` entries, the caller is responsible
|
||||
/// for removing the corresponding entry to prevent endless loop in case the entry is invalid, e.g.
|
||||
/// points to non-existent message or contact.
|
||||
async fn send_mdn_msg_id(
|
||||
context: &Context,
|
||||
msg_id: MsgId,
|
||||
contact_id: ContactId,
|
||||
smtp: &mut Smtp,
|
||||
) -> Result<()> {
|
||||
let contact = Contact::load_from_db(context, contact_id).await?;
|
||||
if contact.is_blocked() {
|
||||
return Err(format_err!("Contact is blocked"));
|
||||
}
|
||||
|
||||
// Try to aggregate additional MDNs into this MDN.
|
||||
let (additional_msg_ids, additional_rfc724_mids): (Vec<MsgId>, Vec<String>) = context
|
||||
.sql
|
||||
.query_map(
|
||||
"SELECT msg_id, rfc724_mid
|
||||
FROM smtp_mdns
|
||||
WHERE from_id=? AND msg_id!=?",
|
||||
paramsv![contact_id, msg_id],
|
||||
|row| {
|
||||
let msg_id: MsgId = row.get(0)?;
|
||||
let rfc724_mid: String = row.get(1)?;
|
||||
Ok((msg_id, rfc724_mid))
|
||||
},
|
||||
|rows| rows.collect::<Result<Vec<_>, _>>().map_err(Into::into),
|
||||
)
|
||||
.await?
|
||||
.into_iter()
|
||||
.unzip();
|
||||
|
||||
let msg = Message::load_from_db(context, msg_id).await?;
|
||||
let mimefactory = MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await?;
|
||||
let rendered_msg = mimefactory.render(context).await?;
|
||||
let body = rendered_msg.message;
|
||||
|
||||
let addr = contact.get_addr();
|
||||
let recipient = async_smtp::EmailAddress::new(addr.to_string())
|
||||
.map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?;
|
||||
let recipients = vec![recipient];
|
||||
|
||||
match smtp_send(context, &recipients, &body, smtp, msg_id, 0).await {
|
||||
SendResult::Success => {
|
||||
info!(context, "Successfully sent MDN for {}", msg_id);
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp_mdns WHERE msg_id = ?", paramsv![msg_id])
|
||||
.await?;
|
||||
if !additional_msg_ids.is_empty() {
|
||||
let q = format!(
|
||||
"DELETE FROM smtp_mdns WHERE msg_id IN({})",
|
||||
sql::repeat_vars(additional_msg_ids.len())
|
||||
);
|
||||
context
|
||||
.sql
|
||||
.execute(q, rusqlite::params_from_iter(additional_msg_ids))
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
SendResult::Retry => {
|
||||
info!(
|
||||
context,
|
||||
"Temporary SMTP failure while sending an MDN for {}", msg_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
SendResult::Failure(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to send a single MDN. Returns false if there are no MDNs to send.
|
||||
async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result<bool> {
|
||||
let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?;
|
||||
if !mdns_enabled {
|
||||
// User has disabled MDNs.
|
||||
context.sql.execute("DELETE FROM smtp_mdns", []).await?;
|
||||
return Ok(false);
|
||||
}
|
||||
info!(context, "Sending MDNs");
|
||||
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp_mdns WHERE retries > 6", [])
|
||||
.await?;
|
||||
let msg_row = match context
|
||||
.sql
|
||||
.query_row_optional(
|
||||
"SELECT msg_id, from_id FROM smtp_mdns ORDER BY retries LIMIT 1",
|
||||
[],
|
||||
|row| {
|
||||
let msg_id: MsgId = row.get(0)?;
|
||||
let from_id: ContactId = row.get(1)?;
|
||||
Ok((msg_id, from_id))
|
||||
},
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(msg_row) => msg_row,
|
||||
None => return Ok(false),
|
||||
};
|
||||
let (msg_id, contact_id) = msg_row;
|
||||
|
||||
context
|
||||
.sql
|
||||
.execute(
|
||||
"UPDATE smtp_mdns SET retries=retries+1 WHERE msg_id=?",
|
||||
paramsv![msg_id],
|
||||
)
|
||||
.await
|
||||
.context("failed to update MDN retries count")?;
|
||||
|
||||
if let Err(err) = send_mdn_msg_id(context, msg_id, contact_id, smtp).await {
|
||||
// If there is an error, for example there is no message corresponding to the msg_id in the
|
||||
// database, do not try to send this MDN again.
|
||||
context
|
||||
.sql
|
||||
.execute("DELETE FROM smtp_mdns WHERE msg_id = ?", paramsv![msg_id])
|
||||
.await?;
|
||||
Err(err)
|
||||
} else {
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -578,7 +578,7 @@ rfc724_mid TEXT NOT NULL, -- Message-ID
|
||||
mime TEXT NOT NULL, -- SMTP payload
|
||||
msg_id INTEGER NOT NULL, -- ID of the message in `msgs` table
|
||||
recipients TEXT NOT NULL, -- List of recipients separated by space
|
||||
retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the messsage
|
||||
retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the message
|
||||
);
|
||||
CREATE INDEX smtp_messageid ON imap(rfc724_mid);
|
||||
"#,
|
||||
@@ -624,6 +624,19 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid);
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
if dbversion < 90 {
|
||||
info!(context, "[migration] v90");
|
||||
sql.execute_migration(
|
||||
r#"CREATE TABLE smtp_mdns (
|
||||
msg_id INTEGER NOT NULL, -- id of the message in msgs table which requested MDN
|
||||
from_id INTEGER NOT NULL, -- id of the contact that sent the message, MDN destination
|
||||
rfc724_mid TEXT NOT NULL, -- Message-ID header
|
||||
retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send MDN
|
||||
);"#,
|
||||
90,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok((
|
||||
recalc_fingerprints,
|
||||
|
||||
Reference in New Issue
Block a user