diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bb482649..249f9866a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Unreleased + +### Changes +- send normal messages with higher priority than MDNs #3243 + + ## 1.80.0 ### Changes diff --git a/src/job.rs b/src/job.rs index 3a28d10c5..721aa4702 100644 --- a/src/job.rs +++ b/src/job.rs @@ -4,36 +4,22 @@ //! and job types. use std::fmt; -use anyhow::{format_err, Context as _, Result}; +use anyhow::{Context as _, Result}; use deltachat_derive::{FromSql, ToSql}; use rand::{thread_rng, Rng}; use crate::config::Config; -use crate::contact::{normalize_name, Contact, ContactId, Modifier, Origin}; +use crate::contact::{normalize_name, Contact, Modifier, Origin}; use crate::context::Context; use crate::dc_tools::time; use crate::events::EventType; use crate::imap::Imap; -use crate::message::{Message, MsgId}; -use crate::mimefactory::MimeFactory; -use crate::param::{Param, Params}; +use crate::param::Params; use crate::scheduler::InterruptInfo; -use crate::smtp::{smtp_send, SendResult, Smtp}; -use crate::sql; // results in ~3 weeks for the last backoff timespan const JOB_RETRIES: u32 = 17; -/// Thread IDs -#[derive( - Debug, Display, Copy, Clone, PartialEq, Eq, FromPrimitive, ToPrimitive, FromSql, ToSql, -)] -#[repr(u32)] -pub(crate) enum Thread { - Imap = 100, - Smtp = 5000, -} - /// Job try result. #[derive(Debug, Display)] pub enum Status { @@ -72,7 +58,6 @@ macro_rules! job_try { )] #[repr(u32)] pub enum Action { - // Jobs in the INBOX-thread, range from DC_IMAP_THREAD..DC_IMAP_THREAD+999 FetchExistingMsgs = 110, // this is user initiated so it should have a fairly high priority @@ -87,24 +72,6 @@ pub enum Action { // UID synchronization is high-priority to make sure correct UIDs // are used by message moving/deletion. ResyncFolders = 300, - - // Jobs in the SMTP-thread, range from DC_SMTP_THREAD..DC_SMTP_THREAD+999 - SendMdn = 5010, -} - -impl From for Thread { - fn from(action: Action) -> Thread { - use Action::*; - - match action { - FetchExistingMsgs => Thread::Imap, - ResyncFolders => Thread::Imap, - UpdateRecentQuota => Thread::Imap, - DownloadMsg => Thread::Imap, - - SendMdn => Thread::Smtp, - } - } } #[derive(Debug, Clone, PartialEq)] @@ -159,9 +126,7 @@ impl Job { /// /// The Job is consumed by this method. pub(crate) async fn save(self, context: &Context) -> Result<()> { - let thread: Thread = self.action.into(); - - info!(context, "saving job for {}-thread: {:?}", thread, self); + info!(context, "saving job {:?}", self); if self.job_id != 0 { context @@ -178,10 +143,9 @@ impl Job { .await?; } else { context.sql.execute( - "INSERT INTO jobs (added_timestamp, thread, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?,?);", + "INSERT INTO jobs (added_timestamp, action, foreign_id, param, desired_timestamp) VALUES (?,?,?,?,?);", paramsv![ self.added_timestamp, - thread, self.action, self.foreign_id, self.param.to_string(), @@ -193,118 +157,6 @@ impl Job { Ok(()) } - /// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job. - async fn get_additional_mdn_jobs( - &self, - context: &Context, - contact_id: ContactId, - ) -> Result<(Vec, Vec)> { - // Extract message IDs from job parameters - let res: Vec<(u32, MsgId)> = context - .sql - .query_map( - "SELECT id, param FROM jobs WHERE foreign_id=? AND id!=?", - paramsv![contact_id, self.job_id], - |row| { - let job_id: u32 = row.get(0)?; - let params_str: String = row.get(1)?; - let params: Params = params_str.parse().unwrap_or_default(); - Ok((job_id, params)) - }, - |jobs| { - let res = jobs - .filter_map(|row| { - let (job_id, params) = row.ok()?; - let msg_id = params.get_msg_id()?; - Some((job_id, msg_id)) - }) - .collect(); - Ok(res) - }, - ) - .await?; - - // Load corresponding RFC724 message IDs - let mut job_ids = Vec::new(); - let mut rfc724_mids = Vec::new(); - for (job_id, msg_id) in res { - if let Ok(Message { rfc724_mid, .. }) = Message::load_from_db(context, msg_id).await { - job_ids.push(job_id); - rfc724_mids.push(rfc724_mid); - } - } - Ok((job_ids, rfc724_mids)) - } - - async fn send_mdn(&mut self, context: &Context, smtp: &mut Smtp) -> Status { - let mdns_enabled = job_try!(context.get_config_bool(Config::MdnsEnabled).await); - if !mdns_enabled { - // User has disabled MDNs after job scheduling but before - // execution. - return Status::Finished(Err(format_err!("MDNs are disabled"))); - } - - let contact_id = ContactId::new(self.foreign_id); - let contact = job_try!(Contact::load_from_db(context, contact_id).await); - if contact.is_blocked() { - return Status::Finished(Err(format_err!("Contact is blocked"))); - } - - let msg_id = if let Some(msg_id) = self.param.get_msg_id() { - msg_id - } else { - return Status::Finished(Err(format_err!( - "SendMdn job has invalid parameters: {}", - self.param - ))); - }; - - // Try to aggregate other SendMdn jobs and send a combined MDN. - let (additional_job_ids, additional_rfc724_mids) = self - .get_additional_mdn_jobs(context, contact_id) - .await - .unwrap_or_default(); - - if !additional_rfc724_mids.is_empty() { - info!( - context, - "SendMdn job: aggregating {} additional MDNs", - additional_rfc724_mids.len() - ) - } - - let msg = job_try!(Message::load_from_db(context, msg_id).await); - let mimefactory = - job_try!(MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await); - let rendered_msg = job_try!(mimefactory.render(context).await); - let body = rendered_msg.message; - - let addr = contact.get_addr(); - let recipient = job_try!(async_smtp::EmailAddress::new(addr.to_string()) - .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))); - let recipients = vec![recipient]; - - // connect to SMTP server, if not yet done - if let Err(err) = smtp.connect_configured(context).await { - warn!(context, "SMTP connection failure: {:?}", err); - smtp.last_send_error = Some(err.to_string()); - return Status::RetryLater; - } - - match smtp_send(context, &recipients, &body, smtp, msg_id, 0).await { - SendResult::Success => { - // Remove additional SendMdn jobs we have aggregated into this one. - job_try!(kill_ids(context, &additional_job_ids).await); - Status::Finished(Ok(())) - } - SendResult::Retry => { - info!(context, "Temporary SMTP failure while sending an MDN"); - Status::RetryLater - } - SendResult::Failure(err) => Status::Finished(Err(err)), - } - } - /// Read the recipients from old emails sent by the user and add them as contacts. /// This way, we can already offer them some email addresses they can write to. /// @@ -387,22 +239,6 @@ pub async fn kill_action(context: &Context, action: Action) -> Result<()> { Ok(()) } -/// Remove jobs with specified IDs. -async fn kill_ids(context: &Context, job_ids: &[u32]) -> Result<()> { - if job_ids.is_empty() { - return Ok(()); - } - let q = format!( - "DELETE FROM jobs WHERE id IN({})", - sql::repeat_vars(job_ids.len()) - ); - context - .sql - .execute(q, rusqlite::params_from_iter(job_ids)) - .await?; - Ok(()) -} - pub async fn action_exists(context: &Context, action: Action) -> Result { let exists = context .sql @@ -461,36 +297,18 @@ async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, fold pub(crate) enum Connection<'a> { Inbox(&'a mut Imap), - Smtp(&'a mut Smtp), -} - -impl<'a> fmt::Display for Connection<'a> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Connection::Inbox(_) => write!(f, "Inbox"), - Connection::Smtp(_) => write!(f, "Smtp"), - } - } } impl<'a> Connection<'a> { fn inbox(&mut self) -> &mut Imap { match self { Connection::Inbox(imap) => imap, - _ => panic!("Not an inbox"), - } - } - - fn smtp(&mut self) -> &mut Smtp { - match self { - Connection::Smtp(smtp) => smtp, - _ => panic!("Not a smtp"), } } } pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_>, mut job: Job) { - info!(context, "{}-job {} started...", &connection, &job); + info!(context, "job {} started...", &job); let try_res = match perform_job_action(context, &mut job, &mut connection, 0).await { Status::RetryNow => perform_job_action(context, &mut job, &mut connection, 1).await, @@ -502,17 +320,13 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_ let tries = job.tries + 1; if tries < JOB_RETRIES { - info!( - context, - "{} thread increases job {} tries to {}", &connection, job, tries - ); + info!(context, "increase job {} tries to {}", job, tries); job.tries = tries; let time_offset = get_backoff_time_offset(tries, job.action); job.desired_timestamp = time() + time_offset; info!( context, - "{}-job #{} not succeeded on try #{}, retry in {} seconds.", - &connection, + "job #{} not succeeded on try #{}, retry in {} seconds.", job.job_id as u32, tries, time_offset @@ -523,10 +337,7 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_ } else { info!( context, - "{} thread removes job {} as it exhausted {} retries", - &connection, - job, - JOB_RETRIES + "remove job {} as it exhausted {} retries", job, JOB_RETRIES ); job.delete(context).await.unwrap_or_else(|err| { error!(context, "failed to delete job: {}", err); @@ -537,13 +348,10 @@ pub(crate) async fn perform_job(context: &Context, mut connection: Connection<'_ if let Err(err) = res { warn!( context, - "{} removes job {} as it failed with error {:#}", &connection, job, err + "remove job {} as it failed with error {:#}", job, err ); } else { - info!( - context, - "{} removes job {} as it succeeded", &connection, job - ); + info!(context, "remove job {} as it succeeded", job); } job.delete(context).await.unwrap_or_else(|err| { @@ -559,13 +367,9 @@ async fn perform_job_action( connection: &mut Connection<'_>, tries: u32, ) -> Status { - info!( - context, - "{} begin immediate try {} of job {}", &connection, tries, job - ); + info!(context, "begin immediate try {} of job {}", tries, job); let try_res = match job.action { - Action::SendMdn => job.send_mdn(context, connection.smtp()).await, Action::ResyncFolders => job.resync_folders(context, connection.inbox()).await, Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await, Action::UpdateRecentQuota => match context.update_recent_quota(connection.inbox()).await { @@ -600,19 +404,6 @@ fn get_backoff_time_offset(tries: u32, action: Action) -> i64 { } } -pub(crate) async fn send_mdn(context: &Context, msg_id: MsgId, from_id: ContactId) -> Result<()> { - let mut param = Params::new(); - param.set(Param::MsgId, msg_id.to_u32().to_string()); - - add( - context, - Job::new(Action::SendMdn, from_id.to_u32(), param, 0), - ) - .await?; - - Ok(()) -} - pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { kill_action(context, Action::ResyncFolders).await?; add( @@ -638,10 +429,6 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { info!(context, "interrupt: imap"); context.interrupt_inbox(InterruptInfo::new(false)).await; } - Action::SendMdn => { - info!(context, "interrupt: smtp"); - context.interrupt_smtp(InterruptInfo::new(false)).await; - } } } Ok(()) @@ -649,21 +436,15 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { /// Load jobs from the database. /// -/// Load jobs for this "[Thread]", i.e. either load SMTP jobs or load -/// IMAP jobs. The `probe_network` parameter decides how to query +/// The `probe_network` parameter decides how to query /// jobs, this is tricky and probably wrong currently. Look at the /// SQL queries for details. -pub(crate) async fn load_next( - context: &Context, - thread: Thread, - info: &InterruptInfo, -) -> Result> { - info!(context, "loading job for {}-thread", thread); +pub(crate) async fn load_next(context: &Context, info: &InterruptInfo) -> Result> { + info!(context, "loading job"); let query; let params; let t = time(); - let thread_i = thread as i64; if !info.probe_network { // processing for first-try and after backoff-timeouts: @@ -671,11 +452,11 @@ pub(crate) async fn load_next( query = r#" SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries FROM jobs -WHERE thread=? AND desired_timestamp<=? +WHERE desired_timestamp<=? ORDER BY action DESC, added_timestamp LIMIT 1; "#; - params = paramsv![thread_i, t]; + params = paramsv![t]; } else { // processing after call to dc_maybe_network(): // process _all_ pending jobs that failed before @@ -683,11 +464,11 @@ LIMIT 1; query = r#" SELECT id, action, foreign_id, param, added_timestamp, desired_timestamp, tries FROM jobs -WHERE thread=? AND tries>0 +WHERE tries>0 ORDER BY desired_timestamp, action DESC LIMIT 1; "#; - params = paramsv![thread_i]; + params = paramsv![]; }; loop { @@ -742,11 +523,10 @@ mod tests { .sql .execute( "INSERT INTO jobs - (added_timestamp, thread, action, foreign_id, param, desired_timestamp) - VALUES (?, ?, ?, ?, ?, ?);", + (added_timestamp, action, foreign_id, param, desired_timestamp) + VALUES (?, ?, ?, ?, ?);", paramsv![ now, - Thread::from(Action::DownloadMsg), if valid { Action::DownloadMsg as i32 } else { @@ -768,21 +548,11 @@ mod tests { // all jobs. let t = TestContext::new().await; insert_job(&t, 1, false).await; // This can not be loaded into Job struct. - let jobs = load_next( - &t, - Thread::from(Action::DownloadMsg), - &InterruptInfo::new(false), - ) - .await?; + let jobs = load_next(&t, &InterruptInfo::new(false)).await?; assert!(jobs.is_none()); insert_job(&t, 1, true).await; - let jobs = load_next( - &t, - Thread::from(Action::DownloadMsg), - &InterruptInfo::new(false), - ) - .await?; + let jobs = load_next(&t, &InterruptInfo::new(false)).await?; assert!(jobs.is_some()); Ok(()) } @@ -793,12 +563,7 @@ mod tests { insert_job(&t, 1, true).await; - let jobs = load_next( - &t, - Thread::from(Action::DownloadMsg), - &InterruptInfo::new(false), - ) - .await?; + let jobs = load_next(&t, &InterruptInfo::new(false)).await?; assert!(jobs.is_some()); Ok(()) } diff --git a/src/message.rs b/src/message.rs index b13d5b9e1..f33c09285 100644 --- a/src/message.rs +++ b/src/message.rs @@ -23,7 +23,6 @@ use crate::download::DownloadState; use crate::ephemeral::{start_ephemeral_timers_msgids, Timer as EphemeralTimer}; use crate::events::EventType; use crate::imap::markseen_on_imap_table; -use crate::job; use crate::log::LogExt; use crate::mimeparser::{parse_message_id, FailureReport, SystemMessage}; use crate::param::{Param, Params}; @@ -1364,9 +1363,15 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> { let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?; if mdns_enabled { - if let Err(err) = job::send_mdn(context, id, curr_from_id).await { - warn!(context, "could not send out mdn for {}: {}", id, err); - } + context + .sql + .execute( + "INSERT INTO smtp_mdns (msg_id, from_id, rfc724_mid) VALUES(?, ?, ?)", + paramsv![id, curr_from_id, curr_rfc724_mid], + ) + .await + .context("failed to insert into smtp_mdns")?; + context.interrupt_smtp(InterruptInfo::new(false)).await; } } updated_chat_ids.insert(curr_chat_id); diff --git a/src/scheduler.rs b/src/scheduler.rs index bf2ef40b4..d0195c828 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -11,7 +11,7 @@ use crate::dc_tools::maybe_add_time_based_warnings; use crate::dc_tools::time; use crate::ephemeral::{self, delete_expired_imap_messages}; use crate::imap::Imap; -use crate::job::{self, Thread}; +use crate::job; use crate::location; use crate::log::LogExt; use crate::smtp::{send_smtp_messages, Smtp}; @@ -93,7 +93,7 @@ async fn inbox_loop(ctx: Context, started: Sender<()>, inbox_handlers: ImapConne let mut info = InterruptInfo::default(); loop { - let job = match job::load_next(&ctx, Thread::Imap, &info).await { + let job = match job::load_next(&ctx, &info).await { Err(err) => { error!(ctx, "Failed loading job from the database: {:#}.", err); None @@ -303,65 +303,46 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect } let mut timeout = None; - let mut interrupt_info = Default::default(); loop { - let job = match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await { - Err(err) => { - error!(ctx, "Failed loading job from the database: {:#}.", err); - None - } - Ok(job) => job, + let res = send_smtp_messages(&ctx, &mut connection).await; + if let Err(err) = &res { + warn!(ctx, "send_smtp_messages failed: {:#}", err); + } + let success = res.unwrap_or(false); + timeout = if success { + None + } else { + Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) }; - match job { - Some(job) => { - info!(ctx, "executing smtp job"); - job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await; - interrupt_info = Default::default(); - } - None => { - let res = send_smtp_messages(&ctx, &mut connection).await; - if let Err(err) = &res { - warn!(ctx, "send_smtp_messages failed: {:#}", err); - } - let success = res.unwrap_or(false); - timeout = if success { - None - } else { - Some(timeout.map_or(30, |timeout: u64| timeout.saturating_mul(3))) - }; - - // Fake Idle - info!(ctx, "smtp fake idle - started"); - match &connection.last_send_error { - None => connection.connectivity.set_connected(&ctx).await, - Some(err) => connection.connectivity.set_err(&ctx, err).await, - } - - // If send_smtp_messages() failed, we set a timeout for the fake-idle so that - // sending is retried (at the latest) after the timeout. If sending fails - // again, we increase the timeout exponentially, in order not to do lots of - // unnecessary retries. - if let Some(timeout) = timeout { - info!( - ctx, - "smtp has messages to retry, planning to retry {} seconds later", - timeout - ); - let duration = std::time::Duration::from_secs(timeout); - interrupt_info = async_std::future::timeout(duration, async { - idle_interrupt_receiver.recv().await.unwrap_or_default() - }) - .await - .unwrap_or_default(); - } else { - info!(ctx, "smtp has no messages to retry, waiting for interrupt"); - interrupt_info = idle_interrupt_receiver.recv().await.unwrap_or_default(); - }; - - info!(ctx, "smtp fake idle - interrupted") - } + // Fake Idle + info!(ctx, "smtp fake idle - started"); + match &connection.last_send_error { + None => connection.connectivity.set_connected(&ctx).await, + Some(err) => connection.connectivity.set_err(&ctx, err).await, } + + // If send_smtp_messages() failed, we set a timeout for the fake-idle so that + // sending is retried (at the latest) after the timeout. If sending fails + // again, we increase the timeout exponentially, in order not to do lots of + // unnecessary retries. + if let Some(timeout) = timeout { + info!( + ctx, + "smtp has messages to retry, planning to retry {} seconds later", timeout + ); + let duration = std::time::Duration::from_secs(timeout); + async_std::future::timeout(duration, async { + idle_interrupt_receiver.recv().await.unwrap_or_default() + }) + .await + .unwrap_or_default(); + } else { + info!(ctx, "smtp has no messages to retry, waiting for interrupt"); + idle_interrupt_receiver.recv().await.unwrap_or_default(); + }; + + info!(ctx, "smtp fake idle - interrupted") } }; diff --git a/src/smtp.rs b/src/smtp.rs index f76d99bfb..514423228 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -10,14 +10,19 @@ use async_smtp::smtp::response::{Category, Code, Detail}; use async_smtp::{smtp, EmailAddress, ServerAddress}; use async_std::task; +use crate::config::Config; use crate::constants::DC_LP_AUTH_OAUTH2; +use crate::contact::{Contact, ContactId}; use crate::events::EventType; use crate::login_param::{ dc_build_tls, CertificateChecks, LoginParam, ServerLoginParam, Socks5Config, }; +use crate::message::Message; use crate::message::{self, MsgId}; +use crate::mimefactory::MimeFactory; use crate::oauth2::dc_get_oauth2_access_token; use crate::provider::Socket; +use crate::sql; use crate::{context::Context, scheduler::connectivity::ConnectivityStore}; /// SMTP write and read timeout in seconds. @@ -82,6 +87,11 @@ impl Smtp { /// Connect using configured parameters. pub async fn connect_configured(&mut self, context: &Context) -> Result<()> { + if self.has_maybe_stale_connection().await { + info!(context, "Closing stale connection"); + self.disconnect().await; + } + if self.is_connected().await { return Ok(()); } @@ -226,18 +236,13 @@ pub(crate) async fn smtp_send( smtp.connectivity.set_working(context).await; - if smtp.has_maybe_stale_connection().await { - info!(context, "Closing stale connection"); - smtp.disconnect().await; - - if let Err(err) = smtp - .connect_configured(context) - .await - .context("failed to reopen stale SMTP connection") - { - smtp.last_send_error = Some(format!("{:#}", err)); - return SendResult::Retry; - } + if let Err(err) = smtp + .connect_configured(context) + .await + .context("Failed to open SMTP connection") + { + smtp.last_send_error = Some(format!("{:#}", err)); + return SendResult::Retry; } let send_result = smtp @@ -479,7 +484,7 @@ pub(crate) async fn send_msg_to_smtp( } } -/// Tries to send all messages currently in `smtp` table. +/// Tries to send all messages currently in `smtp` and `smtp_mdns` tables. /// /// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent /// does not block other messages in the queue from being sent. @@ -510,5 +515,150 @@ pub(crate) async fn send_smtp_messages(context: &Context, connection: &mut Smtp) success = false; } } + + loop { + match send_mdn(context, connection).await { + Err(err) => { + info!(context, "Failed to send MDNs over SMTP: {:#}.", err); + success = false; + break; + } + Ok(false) => { + break; + } + Ok(true) => {} + } + } Ok(success) } + +/// Tries to send MDN for message `msg_id` to `contact_id`. +/// +/// Attempts to aggregate additional MDNs for `contact_id` into sent MDN. +/// +/// On failure returns an error without removing any `smtp_mdns` entries, the caller is responsible +/// for removing the corresponding entry to prevent endless loop in case the entry is invalid, e.g. +/// points to non-existent message or contact. +async fn send_mdn_msg_id( + context: &Context, + msg_id: MsgId, + contact_id: ContactId, + smtp: &mut Smtp, +) -> Result<()> { + let contact = Contact::load_from_db(context, contact_id).await?; + if contact.is_blocked() { + return Err(format_err!("Contact is blocked")); + } + + // Try to aggregate additional MDNs into this MDN. + let (additional_msg_ids, additional_rfc724_mids): (Vec, Vec) = context + .sql + .query_map( + "SELECT msg_id, rfc724_mid + FROM smtp_mdns + WHERE from_id=? AND msg_id!=?", + paramsv![contact_id, msg_id], + |row| { + let msg_id: MsgId = row.get(0)?; + let rfc724_mid: String = row.get(1)?; + Ok((msg_id, rfc724_mid)) + }, + |rows| rows.collect::, _>>().map_err(Into::into), + ) + .await? + .into_iter() + .unzip(); + + let msg = Message::load_from_db(context, msg_id).await?; + let mimefactory = MimeFactory::from_mdn(context, &msg, additional_rfc724_mids).await?; + let rendered_msg = mimefactory.render(context).await?; + let body = rendered_msg.message; + + let addr = contact.get_addr(); + let recipient = async_smtp::EmailAddress::new(addr.to_string()) + .map_err(|err| format_err!("invalid recipient: {} {:?}", addr, err))?; + let recipients = vec![recipient]; + + match smtp_send(context, &recipients, &body, smtp, msg_id, 0).await { + SendResult::Success => { + info!(context, "Successfully sent MDN for {}", msg_id); + context + .sql + .execute("DELETE FROM smtp_mdns WHERE msg_id = ?", paramsv![msg_id]) + .await?; + if !additional_msg_ids.is_empty() { + let q = format!( + "DELETE FROM smtp_mdns WHERE msg_id IN({})", + sql::repeat_vars(additional_msg_ids.len()) + ); + context + .sql + .execute(q, rusqlite::params_from_iter(additional_msg_ids)) + .await?; + } + Ok(()) + } + SendResult::Retry => { + info!( + context, + "Temporary SMTP failure while sending an MDN for {}", msg_id + ); + Ok(()) + } + SendResult::Failure(err) => Err(err), + } +} + +/// Tries to send a single MDN. Returns false if there are no MDNs to send. +async fn send_mdn(context: &Context, smtp: &mut Smtp) -> Result { + let mdns_enabled = context.get_config_bool(Config::MdnsEnabled).await?; + if !mdns_enabled { + // User has disabled MDNs. + context.sql.execute("DELETE FROM smtp_mdns", []).await?; + return Ok(false); + } + info!(context, "Sending MDNs"); + + context + .sql + .execute("DELETE FROM smtp_mdns WHERE retries > 6", []) + .await?; + let msg_row = match context + .sql + .query_row_optional( + "SELECT msg_id, from_id FROM smtp_mdns ORDER BY retries LIMIT 1", + [], + |row| { + let msg_id: MsgId = row.get(0)?; + let from_id: ContactId = row.get(1)?; + Ok((msg_id, from_id)) + }, + ) + .await? + { + Some(msg_row) => msg_row, + None => return Ok(false), + }; + let (msg_id, contact_id) = msg_row; + + context + .sql + .execute( + "UPDATE smtp_mdns SET retries=retries+1 WHERE msg_id=?", + paramsv![msg_id], + ) + .await + .context("failed to update MDN retries count")?; + + if let Err(err) = send_mdn_msg_id(context, msg_id, contact_id, smtp).await { + // If there is an error, for example there is no message corresponding to the msg_id in the + // database, do not try to send this MDN again. + context + .sql + .execute("DELETE FROM smtp_mdns WHERE msg_id = ?", paramsv![msg_id]) + .await?; + Err(err) + } else { + Ok(true) + } +} diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 7b5caf142..8cc29322d 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -578,7 +578,7 @@ rfc724_mid TEXT NOT NULL, -- Message-ID mime TEXT NOT NULL, -- SMTP payload msg_id INTEGER NOT NULL, -- ID of the message in `msgs` table recipients TEXT NOT NULL, -- List of recipients separated by space -retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the messsage +retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the message ); CREATE INDEX smtp_messageid ON imap(rfc724_mid); "#, @@ -624,6 +624,19 @@ CREATE INDEX smtp_messageid ON imap(rfc724_mid); ) .await?; } + if dbversion < 90 { + info!(context, "[migration] v90"); + sql.execute_migration( + r#"CREATE TABLE smtp_mdns ( + msg_id INTEGER NOT NULL, -- id of the message in msgs table which requested MDN + from_id INTEGER NOT NULL, -- id of the contact that sent the message, MDN destination + rfc724_mid TEXT NOT NULL, -- Message-ID header + retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send MDN + );"#, + 90, + ) + .await?; + } Ok(( recalc_fingerprints,