diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 6b1b52a41..248b71761 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -737,7 +737,6 @@ class TestOnlineAccount: # make sure we are not sending message to ourselves assert self_addr not in ev.data2 assert other_addr in ev.data2 - ev = ac1._evtracker.get_matching("DC_EVENT_DELETED_BLOB_FILE") lp.sec("ac1: setting bcc_self=1") ac1.set_config("bcc_self", "1") @@ -753,7 +752,6 @@ class TestOnlineAccount: # now make sure we are sending message to ourselves too assert self_addr in ev.data2 assert other_addr in ev.data2 - ev = ac1._evtracker.get_matching("DC_EVENT_DELETED_BLOB_FILE") assert ac1.direct_imap.idle_wait_for_seen() # Second client receives only second message, but not the first diff --git a/src/chat.rs b/src/chat.rs index 7fad34fea..9671de0e1 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -5,7 +5,7 @@ use std::convert::{TryFrom, TryInto}; use std::str::FromStr; use std::time::{Duration, SystemTime}; -use anyhow::{bail, ensure, format_err, Context as _, Result}; +use anyhow::{bail, ensure, Context as _, Result}; use async_std::path::{Path, PathBuf}; use deltachat_derive::{FromSql, ToSql}; use serde::{Deserialize, Serialize}; @@ -32,10 +32,14 @@ use crate::ephemeral::{delete_expired_messages, schedule_ephemeral_task, Timer a use crate::events::EventType; use crate::html::new_html_mimepart; use crate::job::{self, Action}; +use crate::location; use crate::message::{self, Message, MessageState, MsgId}; +use crate::mimefactory::MimeFactory; use crate::mimeparser::SystemMessage; use crate::param::{Param, Params}; use crate::peerstate::{Peerstate, PeerstateVerifiedStatus}; +use crate::scheduler::InterruptInfo; +use crate::smtp::send_msg_to_smtp; use crate::stock_str; use crate::webxdc::{WEBXDC_SENDING_LIMIT, WEBXDC_SUFFIX}; @@ -1283,7 +1287,7 @@ impl Chat { msg.param.set_int(Param::AttachGroupImage, 1); self.param.remove(Param::Unpromoted); self.update_param(context).await?; - // send_sync_msg() is called (usually) a moment later at Job::send_msg_to_smtp() + // send_sync_msg() is called (usually) a moment later at send_msg_to_smtp() // when the group-creation message is actually sent though SMTP - // this makes sure, the other devices are aware of grpid that is used in the sync-message. context.sync_qr_code_tokens(Some(self.id)).await?; @@ -1963,41 +1967,25 @@ pub async fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) -> /// Tries to send a message synchronously. /// -/// Directly opens an smtp -/// connection and sends the message, bypassing the job system. If this fails, it writes a send job to -/// the database. +/// Creates a new message in `smtp` table, then drectly opens an SMTP connection and sends the +/// message. If this fails, the message remains in the database to be sent later. pub async fn send_msg_sync(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result { - if let Some(mut job) = prepare_send_msg(context, chat_id, msg).await? { + if let Some(rowid) = prepare_send_msg(context, chat_id, msg).await? { let mut smtp = crate::smtp::Smtp::new(); + send_msg_to_smtp(context, &mut smtp, rowid) + .await + .context("failed to send message, queued for later sending")?; - let status = job.send_msg_to_smtp(context, &mut smtp).await; - - match status { - job::Status::Finished(Ok(_)) => { - context.emit_event(EventType::MsgsChanged { - chat_id: msg.chat_id, - msg_id: msg.id, - }); - - Ok(msg.id) - } - _ => { - job.save(context).await?; - Err(format_err!( - "failed to send message, queued for later sending" - )) - } - } - } else { - // Nothing to do - Ok(msg.id) + context.emit_event(EventType::MsgsChanged { + chat_id: msg.chat_id, + msg_id: msg.id, + }); } + Ok(msg.id) } async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result { - if let Some(send_job) = prepare_send_msg(context, chat_id, msg).await? { - job::add(context, send_job).await?; - + if prepare_send_msg(context, chat_id, msg).await?.is_some() { context.emit_event(EventType::MsgsChanged { chat_id: msg.chat_id, msg_id: msg.id, @@ -2006,16 +1994,19 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - if msg.param.exists(Param::SetLatitude) { context.emit_event(EventType::LocationChanged(Some(DC_CONTACT_ID_SELF))); } + + context.interrupt_smtp(InterruptInfo::new(false)).await; } Ok(msg.id) } +/// Returns rowid from `smtp` table. async fn prepare_send_msg( context: &Context, chat_id: ChatId, msg: &mut Message, -) -> Result> { +) -> Result> { // dc_prepare_msg() leaves the message state to OutPreparing, we // only have to change the state to OutPending in this case. // Otherwise we still have to prepare the message, which will set @@ -2031,9 +2022,143 @@ async fn prepare_send_msg( ); message::update_msg_state(context, msg.id, MessageState::OutPending).await?; } - let job = job::send_msg_job(context, msg.id).await?; + let row_id = create_send_msg_job(context, msg.id).await?; + Ok(row_id) +} - Ok(job) +/// Constructs a job for sending a message and inserts into `smtp` table. +/// +/// Returns rowid if job was created or `None` if SMTP job is not needed, e.g. when sending to a +/// group with only self and no BCC-to-self configured. +/// +/// The caller has to interrupt SMTP loop or otherwise process a new row. +async fn create_send_msg_job(context: &Context, msg_id: MsgId) -> Result> { + let mut msg = Message::load_from_db(context, msg_id).await?; + msg.try_calc_and_set_dimensions(context) + .await + .context("failed to calculate media dimensions")?; + + /* create message */ + let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default(); + + let attach_selfavatar = match shall_attach_selfavatar(context, msg.chat_id).await { + Ok(attach_selfavatar) => attach_selfavatar, + Err(err) => { + warn!(context, "job: cannot get selfavatar-state: {}", err); + false + } + }; + + let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?; + + let mut recipients = mimefactory.recipients(); + + let from = context + .get_config(Config::ConfiguredAddr) + .await? + .unwrap_or_default(); + let lowercase_from = from.to_lowercase(); + + // Send BCC to self if it is enabled and we are not going to + // delete it immediately. + if context.get_config_bool(Config::BccSelf).await? + && context.get_config_delete_server_after().await? != Some(0) + && !recipients + .iter() + .any(|x| x.to_lowercase() == lowercase_from) + { + recipients.push(from); + } + + if recipients.is_empty() { + // may happen eg. for groups with only SELF and bcc_self disabled + info!( + context, + "message {} has no recipient, skipping smtp-send", msg_id + ); + msg_id.set_delivered(context).await?; + return Ok(None); + } + + let rendered_msg = match mimefactory.render(context).await { + Ok(res) => Ok(res), + Err(err) => { + message::set_msg_failed(context, msg_id, Some(err.to_string())).await; + Err(err) + } + }?; + + if needs_encryption && !rendered_msg.is_encrypted { + /* unrecoverable */ + message::set_msg_failed( + context, + msg_id, + Some("End-to-end-encryption unavailable unexpectedly."), + ) + .await; + bail!( + "e2e encryption unavailable {} - {:?}", + msg_id, + needs_encryption + ); + } + + if rendered_msg.is_gossiped { + msg.chat_id.set_gossiped_timestamp(context, time()).await?; + } + + if 0 != rendered_msg.last_added_location_id { + if let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, time()).await { + error!(context, "Failed to set kml sent_timestamp: {:?}", err); + } + if !msg.hidden { + if let Err(err) = + location::set_msg_location_id(context, msg.id, rendered_msg.last_added_location_id) + .await + { + error!(context, "Failed to set msg_location_id: {:?}", err); + } + } + } + + if let Some(sync_ids) = rendered_msg.sync_ids_to_delete { + if let Err(err) = context.delete_sync_ids(sync_ids).await { + error!(context, "Failed to delete sync ids: {:?}", err); + } + } + + if attach_selfavatar { + if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, time()).await { + error!(context, "Failed to set selfavatar timestamp: {:?}", err); + } + } + + if rendered_msg.is_encrypted && !needs_encryption { + msg.param.set_int(Param::GuaranteeE2ee, 1); + msg.update_param(context).await; + } + + ensure!(!recipients.is_empty(), "no recipients for smtp job set"); + + let recipients = recipients.join(" "); + + msg.subject = rendered_msg.subject.clone(); + msg.update_subject(context).await; + + let row_id = context + .sql + .insert( + "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) + VALUES (?1, ?2, ?3, ?4)", + paramsv![ + &rendered_msg.rfc724_mid, + recipients, + &rendered_msg.message, + msg_id + ], + ) + .await?; + Ok(Some(row_id)) } pub async fn send_text_msg( @@ -3049,8 +3174,8 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .prepare_msg_raw(context, &mut msg, None, curr_timestamp) .await?; curr_timestamp += 1; - if let Some(send_job) = job::send_msg_job(context, new_msg_id).await? { - job::add(context, send_job).await?; + if create_send_msg_job(context, new_msg_id).await?.is_some() { + context.interrupt_smtp(InterruptInfo::new(false)).await; } } created_chats.push(chat_id); @@ -4467,7 +4592,8 @@ mod tests { ); // Alice has an SMTP-server replacing the `Message-ID:`-header (as done eg. by outlook.com). - let msg = alice.pop_sent_msg().await.payload(); + let sent_msg = alice.pop_sent_msg().await; + let msg = sent_msg.payload(); assert_eq!(msg.match_indices("Gr.").count(), 2); let msg = msg.replace("Message-ID: for Thread { MaybeSendLocations => Thread::Smtp, MaybeSendLocationsEnded => Thread::Smtp, SendMdn => Thread::Smtp, - SendMsgToSmtp => Thread::Smtp, } } } @@ -226,217 +220,6 @@ impl Job { Ok(()) } - async fn smtp_send( - &mut self, - context: &Context, - recipients: Vec, - message: Vec, - job_id: u32, - smtp: &mut Smtp, - success_cb: F, - ) -> Status - where - F: FnOnce() -> Fut, - Fut: Future>, - { - // hold the smtp lock during sending of a job and - // its ok/error response processing. Note that if a message - // was sent we need to mark it in the database ASAP as we - // otherwise might send it twice. - if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { - info!(context, "smtp-sending out mime message:"); - println!("{}", String::from_utf8_lossy(&message)); - } - - smtp.connectivity.set_working(context).await; - - let send_result = smtp.send(context, recipients, message, job_id).await; - smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string()); - - let status = match send_result { - Err(crate::smtp::send::Error::SmtpSend(err)) => { - // Remote error, retry later. - warn!(context, "SMTP failed to send: {:?}", &err); - - let res = match err { - async_smtp::smtp::error::Error::Permanent(ref response) => { - // Workaround for incorrectly configured servers returning permanent errors - // instead of temporary ones. - let maybe_transient = match response.code { - // Sometimes servers send a permanent error when actually it is a temporary error - // For documentation see - Code { - category: Category::MailSystem, - detail: Detail::Zero, - .. - } => { - // Ignore status code 5.5.0, see - // Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns - // "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later". - // - // Other enhanced status codes, such as Postfix - // "550 5.1.1 : Recipient address rejected: User unknown in local recipient table" - // are not ignored. - response.first_word() == Some(&"5.5.0".to_string()) - } - _ => false, - }; - - if maybe_transient { - Status::RetryLater - } else { - // If we do not retry, add an info message to the chat. - // Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..." - // should definitely go here, because user has to open the link to - // resume message sending. - Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) - } - } - async_smtp::smtp::error::Error::Transient(ref response) => { - // We got a transient 4xx response from SMTP server. - // Give some time until the server-side error maybe goes away. - - if let Some(first_word) = response.first_word() { - if first_word.ends_with(".1.1") - || first_word.ends_with(".1.2") - || first_word.ends_with(".1.3") - { - // Sometimes we receive transient errors that should be permanent. - // Any extended smtp status codes like x.1.1, x.1.2 or x.1.3 that we - // receive as a transient error are misconfigurations of the smtp server. - // See - info!(context, "Smtp-job #{} Received extended status code {} for a transient error. This looks like a misconfigured smtp server, let's fail immediatly", self.job_id, first_word); - Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) - } else { - Status::RetryLater - } - } else { - Status::RetryLater - } - } - _ => { - if smtp.has_maybe_stale_connection().await { - info!(context, "stale connection? immediately reconnecting"); - Status::RetryNow - } else { - Status::RetryLater - } - } - }; - - // this clears last_success info - smtp.disconnect().await; - - res - } - Err(crate::smtp::send::Error::Envelope(err)) => { - // Local error, job is invalid, do not retry. - smtp.disconnect().await; - warn!(context, "SMTP job is invalid: {}", err); - Status::Finished(Err(err.into())) - } - Err(crate::smtp::send::Error::NoTransport) => { - // Should never happen. - // It does not even make sense to disconnect here. - error!(context, "SMTP job failed because SMTP has no transport"); - Status::Finished(Err(format_err!("SMTP has not transport"))) - } - Err(crate::smtp::send::Error::Other(err)) => { - // Local error, job is invalid, do not retry. - smtp.disconnect().await; - warn!(context, "unable to load job: {}", err); - Status::Finished(Err(err)) - } - Ok(()) => { - job_try!(success_cb().await); - Status::Finished(Ok(())) - } - }; - - if let Status::Finished(Err(err)) = &status { - // We couldn't send the message, so mark it as failed - let msg_id = MsgId::new(self.foreign_id); - message::set_msg_failed(context, msg_id, Some(err.to_string())).await; - } - status - } - - pub(crate) async fn send_msg_to_smtp(&mut self, context: &Context, smtp: &mut Smtp) -> Status { - // SMTP server, if not yet done - if let Err(err) = smtp.connect_configured(context).await { - warn!(context, "SMTP connection failure: {:?}", err); - smtp.last_send_error = Some(format!("SMTP connection failure: {:#}", err)); - return Status::RetryLater; - } - - let filename = job_try!(job_try!(self - .param - .get_path(Param::File, context) - .context("can't get filename")) - .context("Can't get filename")); - let body = job_try!(dc_read_file(context, &filename).await); - let recipients = job_try!(self - .param - .get(Param::Recipients) - .context("missing recipients")); - - let recipients_list = recipients - .split('\x1e') - .filter_map( - |addr| match async_smtp::EmailAddress::new(addr.to_string()) { - Ok(addr) => Some(addr), - Err(err) => { - warn!(context, "invalid recipient: {} {:?}", addr, err); - None - } - }, - ) - .collect::>(); - - /* if there is a msg-id and it does not exist in the db, cancel sending. - this happends if dc_delete_msgs() was called - before the generated mime was sent out */ - if 0 != self.foreign_id { - match message::exists(context, MsgId::new(self.foreign_id)).await { - Ok(exists) => { - if !exists { - return Status::Finished(Err(format_err!( - "Not sending Message {} as it was deleted", - self.foreign_id - ))); - } - } - Err(err) => { - warn!(context, "failed to check message existence: {:?}", err); - smtp.last_send_error = - Some(format!("failed to check message existence: {:#}", err)); - return Status::RetryLater; - } - } - }; - - let foreign_id = self.foreign_id; - self.smtp_send(context, recipients_list, body, self.job_id, smtp, || { - async move { - // smtp success, update db ASAP, then delete smtp file - if 0 != foreign_id { - set_delivered(context, MsgId::new(foreign_id)).await?; - } - // now also delete the generated file - dc_delete_file(context, filename).await; - - // finally, create another send-job if there are items to be synced. - // triggering sync-job after msg-send-job guarantees, the recipient has grpid etc. - // once the sync message arrives. - // if there are no items to sync, this function returns fast. - context.send_sync_msg().await?; - - Ok(()) - } - }) - .await - } - /// Get `SendMdn` jobs with foreign_id equal to `contact_id` excluding the `job_id` job. async fn get_additional_mdn_jobs( &self, @@ -535,14 +318,13 @@ impl Job { return Status::RetryLater; } - self.smtp_send(context, recipients, body, self.job_id, smtp, || { - async move { - // Remove additional SendMdn jobs we have aggregated into this one. - kill_ids(context, &additional_job_ids).await?; - Ok(()) - } - }) - .await + let status = smtp_send(context, recipients, body, smtp, msg_id, 0).await; + if matches!(status, Status::Finished(Ok(_))) { + // Remove additional SendMdn jobs we have aggregated into this one. + job_try!(kill_ids(context, &additional_job_ids).await); + } + + status } /// Read the recipients from old emails sent by the user and add them as contacts. @@ -703,17 +485,6 @@ pub async fn action_exists(context: &Context, action: Action) -> Result { Ok(exists) } -async fn set_delivered(context: &Context, msg_id: MsgId) -> Result<()> { - message::update_msg_state(context, msg_id, MessageState::OutDelivered).await?; - let chat_id: ChatId = context - .sql - .query_get_value("SELECT chat_id FROM msgs WHERE id=?", paramsv![msg_id]) - .await? - .unwrap_or_default(); - context.emit_event(EventType::MsgDelivered { chat_id, msg_id }); - Ok(()) -} - async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, folder: Config) { let mailbox = if let Ok(Some(m)) = context.get_config(folder).await { m @@ -759,132 +530,6 @@ async fn add_all_recipients_as_contacts(context: &Context, imap: &mut Imap, fold }; } -/// Constructs a job for sending a message. -/// -/// Returns `None` if no messages need to be sent out. -/// -/// In order to be processed, must be `add`ded. -pub async fn send_msg_job(context: &Context, msg_id: MsgId) -> Result> { - let mut msg = Message::load_from_db(context, msg_id).await?; - msg.try_calc_and_set_dimensions(context).await.ok(); - - /* create message */ - let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default(); - - let attach_selfavatar = match chat::shall_attach_selfavatar(context, msg.chat_id).await { - Ok(attach_selfavatar) => attach_selfavatar, - Err(err) => { - warn!(context, "job: cannot get selfavatar-state: {}", err); - false - } - }; - - let mimefactory = MimeFactory::from_msg(context, &msg, attach_selfavatar).await?; - - let mut recipients = mimefactory.recipients(); - - let from = context - .get_config(Config::ConfiguredAddr) - .await? - .unwrap_or_default(); - let lowercase_from = from.to_lowercase(); - - // Send BCC to self if it is enabled and we are not going to - // delete it immediately. - if context.get_config_bool(Config::BccSelf).await? - && context.get_config_delete_server_after().await? != Some(0) - && !recipients - .iter() - .any(|x| x.to_lowercase() == lowercase_from) - { - recipients.push(from); - } - - if recipients.is_empty() { - // may happen eg. for groups with only SELF and bcc_self disabled - info!( - context, - "message {} has no recipient, skipping smtp-send", msg_id - ); - set_delivered(context, msg_id).await?; - return Ok(None); - } - - let rendered_msg = match mimefactory.render(context).await { - Ok(res) => Ok(res), - Err(err) => { - message::set_msg_failed(context, msg_id, Some(err.to_string())).await; - Err(err) - } - }?; - - if needs_encryption && !rendered_msg.is_encrypted { - /* unrecoverable */ - message::set_msg_failed( - context, - msg_id, - Some("End-to-end-encryption unavailable unexpectedly."), - ) - .await; - bail!( - "e2e encryption unavailable {} - {:?}", - msg_id, - needs_encryption - ); - } - - if rendered_msg.is_gossiped { - msg.chat_id.set_gossiped_timestamp(context, time()).await?; - } - - if 0 != rendered_msg.last_added_location_id { - if let Err(err) = location::set_kml_sent_timestamp(context, msg.chat_id, time()).await { - error!(context, "Failed to set kml sent_timestamp: {:?}", err); - } - if !msg.hidden { - if let Err(err) = - location::set_msg_location_id(context, msg.id, rendered_msg.last_added_location_id) - .await - { - error!(context, "Failed to set msg_location_id: {:?}", err); - } - } - } - - if let Some(sync_ids) = rendered_msg.sync_ids_to_delete { - if let Err(err) = context.delete_sync_ids(sync_ids).await { - error!(context, "Failed to delete sync ids: {:?}", err); - } - } - - if attach_selfavatar { - if let Err(err) = msg.chat_id.set_selfavatar_timestamp(context, time()).await { - error!(context, "Failed to set selfavatar timestamp: {:?}", err); - } - } - - if rendered_msg.is_encrypted && !needs_encryption { - msg.param.set_int(Param::GuaranteeE2ee, 1); - msg.update_param(context).await; - } - - ensure!(!recipients.is_empty(), "no recipients for smtp job set"); - let mut param = Params::new(); - let bytes = &rendered_msg.message; - let blob = BlobObject::create(context, &rendered_msg.rfc724_mid, bytes).await?; - - let recipients = recipients.join("\x1e"); - param.set(Param::File, blob.as_name()); - param.set(Param::Recipients, &recipients); - - msg.subject = rendered_msg.subject.clone(); - msg.update_subject(context).await; - - let job = create(Action::SendMsgToSmtp, msg_id.to_u32(), param, 0)?; - - Ok(Some(job)) -} - pub(crate) enum Connection<'a> { Inbox(&'a mut Imap), Smtp(&'a mut Smtp), @@ -992,7 +637,6 @@ async fn perform_job_action( let try_res = match job.action { Action::Unknown => Status::Finished(Err(format_err!("Unknown job id found"))), - Action::SendMsgToSmtp => job.send_msg_to_smtp(context, connection.smtp()).await, Action::SendMdn => job.send_mdn(context, connection.smtp()).await, Action::MaybeSendLocations => location::job_maybe_send_locations(context, job).await, Action::MaybeSendLocationsEnded => { @@ -1056,16 +700,6 @@ pub(crate) async fn schedule_resync(context: &Context) -> Result<()> { Ok(()) } -/// Creates a job. -pub fn create(action: Action, foreign_id: u32, param: Params, delay_seconds: i64) -> Result { - ensure!( - action != Action::Unknown, - "Invalid action passed to job_add" - ); - - Ok(Job::new(action, foreign_id, param, delay_seconds)) -} - /// Adds a job to the database, scheduling it. pub async fn add(context: &Context, job: Job) -> Result<()> { let action = job.action; @@ -1084,10 +718,7 @@ pub async fn add(context: &Context, job: Job) -> Result<()> { info!(context, "interrupt: imap"); context.interrupt_inbox(InterruptInfo::new(false)).await; } - Action::MaybeSendLocations - | Action::MaybeSendLocationsEnded - | Action::SendMdn - | Action::SendMsgToSmtp => { + Action::MaybeSendLocations | Action::MaybeSendLocationsEnded | Action::SendMdn => { info!(context, "interrupt: smtp"); context.interrupt_smtp(InterruptInfo::new(false)).await; } diff --git a/src/message.rs b/src/message.rs index 2fb04f121..a274c8a44 100644 --- a/src/message.rs +++ b/src/message.rs @@ -113,10 +113,14 @@ WHERE id=?; Ok(()) } - /// Deletes a message and corresponding MDNs from the database. + /// Deletes a message, corresponding MDNs and unsent SMTP messages from the database. pub async fn delete_from_db(self, context: &Context) -> Result<()> { // We don't use transactions yet, so remove MDNs first to make // sure they are not left while the message is deleted. + context + .sql + .execute("DELETE FROM smtp WHERE msg_id=?", paramsv![self]) + .await?; context .sql .execute("DELETE FROM msgs_mdns WHERE msg_id=?;", paramsv![self]) @@ -135,6 +139,20 @@ WHERE id=?; Ok(()) } + pub(crate) async fn set_delivered(self, context: &Context) -> Result<()> { + update_msg_state(context, self, MessageState::OutDelivered).await?; + let chat_id: ChatId = context + .sql + .query_get_value("SELECT chat_id FROM msgs WHERE id=?", paramsv![self]) + .await? + .unwrap_or_default(); + context.emit_event(EventType::MsgDelivered { + chat_id, + msg_id: self, + }); + Ok(()) + } + /// Bad evil escape hatch. /// /// Avoid using this, eventually types should be cleaned up enough diff --git a/src/mimefactory.rs b/src/mimefactory.rs index 1cae242fa..693e23e3f 100644 --- a/src/mimefactory.rs +++ b/src/mimefactory.rs @@ -81,7 +81,7 @@ pub struct MimeFactory<'a> { /// Result of rendering a message, ready to be submitted to a send job. #[derive(Debug, Clone)] pub struct RenderedEmail { - pub message: Vec, + pub message: String, // pub envelope: Envelope, pub is_encrypted: bool, pub is_gossiped: bool, @@ -770,7 +770,7 @@ impl<'a> MimeFactory<'a> { } = self; Ok(RenderedEmail { - message: outer_message.build().as_string().into_bytes(), + message: outer_message.build().as_string(), // envelope: Envelope::new, is_encrypted, is_gossiped, @@ -1931,7 +1931,7 @@ mod tests { let rendered_msg = mimefactory.render(context).await.unwrap(); - let mail = mailparse::parse_mail(&rendered_msg.message).unwrap(); + let mail = mailparse::parse_mail(rendered_msg.message.as_bytes()).unwrap(); assert_eq!( mail.headers .iter() @@ -1941,7 +1941,7 @@ mod tests { "1.0" ); - let _mime_msg = MimeMessage::from_bytes(context, &rendered_msg.message) + let _mime_msg = MimeMessage::from_bytes(context, rendered_msg.message.as_bytes()) .await .unwrap(); } @@ -2015,8 +2015,9 @@ mod tests { let mut msg = Message::new(Viewtype::Text); msg.set_text(Some("this is the text!".to_string())); - let payload = t.send_msg(chat.id, &mut msg).await.payload(); - let mut payload = payload.splitn(3, "\r\n\r\n"); + let sent_msg = t.send_msg(chat.id, &mut msg).await; + let mut payload = sent_msg.payload().splitn(3, "\r\n\r\n"); + let outer = payload.next().unwrap(); let inner = payload.next().unwrap(); let body = payload.next().unwrap(); @@ -2034,8 +2035,8 @@ mod tests { // if another message is sent, that one must not contain the avatar // and no artificial multipart/mixed nesting - let payload = t.send_msg(chat.id, &mut msg).await.payload(); - let mut payload = payload.splitn(2, "\r\n\r\n"); + let sent_msg = t.send_msg(chat.id, &mut msg).await; + let mut payload = sent_msg.payload().splitn(2, "\r\n\r\n"); let outer = payload.next().unwrap(); let body = payload.next().unwrap(); diff --git a/src/param.rs b/src/param.rs index 9afca38f0..f538dc478 100644 --- a/src/param.rs +++ b/src/param.rs @@ -110,9 +110,6 @@ pub enum Param { /// For Jobs AlsoMove = b'M', - /// For Jobs: space-separated list of message recipients - Recipients = b'R', - /// For MDN-sending job MsgId = b'I', diff --git a/src/scheduler.rs b/src/scheduler.rs index aabdbd601..7489961bc 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -10,7 +10,7 @@ use crate::context::Context; use crate::dc_tools::maybe_add_time_based_warnings; use crate::imap::Imap; use crate::job::{self, Thread}; -use crate::smtp::Smtp; +use crate::smtp::{send_smtp_messages, Smtp}; use self::connectivity::ConnectivityStore; @@ -273,17 +273,25 @@ async fn smtp_loop(ctx: Context, started: Sender<()>, smtp_handlers: SmtpConnect let mut interrupt_info = Default::default(); loop { - match job::load_next(&ctx, Thread::Smtp, &interrupt_info) - .await - .ok() - .flatten() - { + let job = match job::load_next(&ctx, Thread::Smtp, &interrupt_info).await { + Err(err) => { + error!(ctx, "Failed loading job from the database: {:#}.", err); + None + } + Ok(job) => job, + }; + + match job { Some(job) => { info!(ctx, "executing smtp job"); job::perform_job(&ctx, job::Connection::Smtp(&mut connection), job).await; interrupt_info = Default::default(); } None => { + if let Err(err) = send_smtp_messages(&ctx, &mut connection).await { + warn!(ctx, "send_smtp_messages failed: {:#}", err); + } + // Fake Idle info!(ctx, "smtp fake idle - started"); match &connection.last_send_error { diff --git a/src/smtp.rs b/src/smtp.rs index 5f3c129d2..25bc90add 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -4,14 +4,18 @@ pub mod send; use std::time::{Duration, SystemTime}; +use anyhow::{format_err, Context as _}; use async_smtp::smtp::client::net::ClientTlsParameters; +use async_smtp::smtp::response::{Category, Code, Detail}; use async_smtp::{error, smtp, EmailAddress, ServerAddress}; use crate::constants::DC_LP_AUTH_OAUTH2; use crate::events::EventType; +use crate::job::Status; use crate::login_param::{ dc_build_tls, CertificateChecks, LoginParam, ServerLoginParam, Socks5Config, }; +use crate::message::{self, MsgId}; use crate::oauth2::dc_get_oauth2_access_token; use crate::provider::Socket; use crate::{context::Context, scheduler::connectivity::ConnectivityStore}; @@ -220,3 +224,234 @@ impl Smtp { Ok(()) } } + +pub(crate) async fn smtp_send( + context: &Context, + recipients: Vec, + message: String, + smtp: &mut Smtp, + msg_id: MsgId, + rowid: i64, +) -> Status { + if std::env::var(crate::DCC_MIME_DEBUG).is_ok() { + info!(context, "smtp-sending out mime message:"); + println!("{}", message); + } + + smtp.connectivity.set_working(context).await; + + let send_result = smtp + .send(context, recipients, message.into_bytes(), rowid) + .await; + smtp.last_send_error = send_result.as_ref().err().map(|e| e.to_string()); + + let status = match send_result { + Err(crate::smtp::send::Error::SmtpSend(err)) => { + // Remote error, retry later. + warn!(context, "SMTP failed to send: {:?}", &err); + + let res = match err { + async_smtp::smtp::error::Error::Permanent(ref response) => { + // Workaround for incorrectly configured servers returning permanent errors + // instead of temporary ones. + let maybe_transient = match response.code { + // Sometimes servers send a permanent error when actually it is a temporary error + // For documentation see + Code { + category: Category::MailSystem, + detail: Detail::Zero, + .. + } => { + // Ignore status code 5.5.0, see + // Maybe incorrectly configured Postfix milter with "reject" instead of "tempfail", which returns + // "550 5.5.0 Service unavailable" instead of "451 4.7.1 Service unavailable - try again later". + // + // Other enhanced status codes, such as Postfix + // "550 5.1.1 : Recipient address rejected: User unknown in local recipient table" + // are not ignored. + response.first_word() == Some(&"5.5.0".to_string()) + } + _ => false, + }; + + if maybe_transient { + Status::RetryLater + } else { + // If we do not retry, add an info message to the chat. + // Yandex error "554 5.7.1 [2] Message rejected under suspicion of SPAM; https://ya.cc/..." + // should definitely go here, because user has to open the link to + // resume message sending. + Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) + } + } + async_smtp::smtp::error::Error::Transient(ref response) => { + // We got a transient 4xx response from SMTP server. + // Give some time until the server-side error maybe goes away. + + if let Some(first_word) = response.first_word() { + if first_word.ends_with(".1.1") + || first_word.ends_with(".1.2") + || first_word.ends_with(".1.3") + { + // Sometimes we receive transient errors that should be permanent. + // Any extended smtp status codes like x.1.1, x.1.2 or x.1.3 that we + // receive as a transient error are misconfigurations of the smtp server. + // See + info!(context, "Received extended status code {} for a transient error. This looks like a misconfigured smtp server, let's fail immediatly", first_word); + Status::Finished(Err(format_err!("Permanent SMTP error: {}", err))) + } else { + Status::RetryLater + } + } else { + Status::RetryLater + } + } + _ => { + if smtp.has_maybe_stale_connection().await { + info!(context, "stale connection? immediately reconnecting"); + Status::RetryNow + } else { + Status::RetryLater + } + } + }; + + // this clears last_success info + smtp.disconnect().await; + + res + } + Err(crate::smtp::send::Error::Envelope(err)) => { + // Local error, job is invalid, do not retry. + smtp.disconnect().await; + warn!(context, "SMTP job is invalid: {}", err); + Status::Finished(Err(err.into())) + } + Err(crate::smtp::send::Error::NoTransport) => { + // Should never happen. + // It does not even make sense to disconnect here. + error!(context, "SMTP job failed because SMTP has no transport"); + Status::Finished(Err(format_err!("SMTP has not transport"))) + } + Err(crate::smtp::send::Error::Other(err)) => { + // Local error, job is invalid, do not retry. + smtp.disconnect().await; + warn!(context, "unable to load job: {}", err); + Status::Finished(Err(err)) + } + Ok(()) => Status::Finished(Ok(())), + }; + + if let Status::Finished(Err(err)) = &status { + // We couldn't send the message, so mark it as failed + message::set_msg_failed(context, msg_id, Some(err.to_string())).await; + } + status +} + +/// Sends message identified by `smtp` table rowid over SMTP connection. +/// +/// Removes row if the message should not be retried, otherwise increments retry count. +pub(crate) async fn send_msg_to_smtp( + context: &Context, + smtp: &mut Smtp, + rowid: i64, +) -> anyhow::Result<()> { + if let Err(err) = smtp + .connect_configured(context) + .await + .context("SMTP connection failure") + { + smtp.last_send_error = Some(format!("SMTP connection failure: {:#}", err)); + return Err(err); + } + + let (body, recipients, msg_id) = context + .sql + .query_row( + "SELECT mime, recipients, msg_id FROM smtp WHERE id=?", + paramsv![rowid], + |row| { + let mime: String = row.get(0)?; + let recipients: String = row.get(1)?; + let msg_id: MsgId = row.get(2)?; + Ok((mime, recipients, msg_id)) + }, + ) + .await?; + let recipients_list = recipients + .split(' ') + .filter_map( + |addr| match async_smtp::EmailAddress::new(addr.to_string()) { + Ok(addr) => Some(addr), + Err(err) => { + warn!(context, "invalid recipient: {} {:?}", addr, err); + None + } + }, + ) + .collect::>(); + + let status = smtp_send(context, recipients_list, body, smtp, msg_id, rowid).await; + match status { + Status::Finished(res) => { + if res.is_ok() { + msg_id.set_delivered(context).await?; + + context + .sql + .execute("DELETE FROM smtp WHERE id=?", paramsv![rowid]) + .await?; + } + res + } + Status::RetryNow | Status::RetryLater => { + context + .sql + .execute( + "UPDATE smtp SET retries=retries+1 WHERE id=?", + paramsv![rowid], + ) + .await + .context("failed to update retries count")?; + Err(format_err!("Retry")) + } + } +} + +/// Tries to send all messages currently in `smtp` table. +/// +/// Logs and ignores SMTP errors to ensure that a single SMTP message constantly failing to be sent +/// does not block other messages in the queue from being sent. +pub(crate) async fn send_smtp_messages( + context: &Context, + connection: &mut Smtp, +) -> anyhow::Result<()> { + context.send_sync_msg().await?; // Add sync message to the end of the queue if needed. + context + .sql + .execute("DELETE FROM smtp WHERE retries > 5", paramsv![]) + .await?; + let rowids = context + .sql + .query_map( + "SELECT id FROM smtp ORDER BY id ASC", + paramsv![], + |row| { + let rowid: i64 = row.get(0)?; + Ok(rowid) + }, + |rowids| { + rowids + .collect::, _>>() + .map_err(Into::into) + }, + ) + .await?; + for rowid in rowids { + if let Err(err) = send_msg_to_smtp(context, connection, rowid).await { + info!(context, "Failed to send message over SMTP: {:#}.", err); + } + } + Ok(()) +} diff --git a/src/smtp/send.rs b/src/smtp/send.rs index a9b42d6ef..716e11362 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -30,7 +30,7 @@ impl Smtp { context: &Context, recipients: Vec, message: Vec, - job_id: u32, + rowid: i64, ) -> Result<()> { let message_len_bytes = message.len(); @@ -52,7 +52,7 @@ impl Smtp { .map_err(Error::Envelope)?; let mail = SendableEmail::new( envelope, - format!("{}", job_id), // only used for internal logging + rowid.to_string(), // only used for internal logging &message, ); diff --git a/src/sql.rs b/src/sql.rs index acc20f2c9..e1290072c 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -271,10 +271,10 @@ impl Sql { &self, query: impl AsRef, params: impl rusqlite::Params, - ) -> anyhow::Result { + ) -> Result { let conn = self.get_conn().await?; conn.execute(query.as_ref(), params)?; - Ok(usize::try_from(conn.last_insert_rowid())?) + Ok(conn.last_insert_rowid()) } /// Prepares and executes the statement and maps a function over the resulting rows. diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index a45cd786d..f9fc471ba 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -562,6 +562,23 @@ CREATE INDEX msgs_status_updates_index1 ON msgs_status_updates (msg_id);"#, ) .await?; } + if dbversion < 85 { + info!(context, "[migration] v85"); + sql.execute_migration( + r#"CREATE TABLE smtp ( +id INTEGER PRIMARY KEY, +rfc724_mid TEXT NOT NULL, -- Message-ID +mime TEXT NOT NULL, -- SMTP payload +msg_id INTEGER NOT NULL, -- ID of the message in `msgs` table +recipients TEXT NOT NULL, -- List of recipients separated by space +retries INTEGER NOT NULL DEFAULT 0 -- Number of failed attempts to send the messsage +); +CREATE INDEX smtp_messageid ON imap(rfc724_mid); +"#, + 85, + ) + .await?; + } Ok(( recalc_fingerprints, diff --git a/src/test_utils.rs b/src/test_utils.rs index 4be0cab49..a2b4b87c4 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -5,13 +5,11 @@ use std::collections::BTreeMap; use std::ops::Deref; use std::panic; -use std::str::FromStr; use std::thread; use std::time::{Duration, Instant}; use ansi_term::Color; use async_std::channel::{self, Receiver, Sender}; -use async_std::path::PathBuf; use async_std::prelude::*; use async_std::sync::{Arc, RwLock}; use async_std::task; @@ -30,11 +28,9 @@ use crate::context::Context; use crate::dc_receive_imf::dc_receive_imf; use crate::dc_tools::EmailAddress; use crate::events::{Event, EventType}; -use crate::job::Action; use crate::key::{self, DcKey, KeyPair, KeyPairUse}; use crate::message::{update_msg_state, Message, MessageState, MsgId}; use crate::mimeparser::MimeMessage; -use crate::param::{Param, Params}; #[allow(non_upper_case_globals)] pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png"); @@ -275,27 +271,27 @@ impl TestContext { /// Panics if there is no message or on any error. pub async fn pop_sent_msg(&self) -> SentMessage { let start = Instant::now(); - let (rowid, foreign_id, raw_params) = loop { + let (rowid, msg_id, payload, recipients) = loop { let row = self .ctx .sql - .query_row( + .query_row_optional( r#" - SELECT id, foreign_id, param - FROM jobs - WHERE action=? - ORDER BY desired_timestamp DESC; - "#, - paramsv![Action::SendMsgToSmtp], + SELECT id, msg_id, mime, recipients + FROM smtp + ORDER BY id DESC"#, + paramsv![], |row| { - let id: u32 = row.get(0)?; - let foreign_id: u32 = row.get(1)?; - let param: String = row.get(2)?; - Ok((id, foreign_id, param)) + let rowid: i64 = row.get(0)?; + let msg_id: MsgId = row.get(1)?; + let mime: String = row.get(2)?; + let recipients: String = row.get(3)?; + Ok((rowid, msg_id, mime, recipients)) }, ) - .await; - if let Ok(row) = row { + .await + .expect("query_row_optional failed"); + if let Some(row) = row { break row; } if start.elapsed() < Duration::from_secs(3) { @@ -304,26 +300,18 @@ impl TestContext { panic!("no sent message found in jobs table"); } }; - let id = MsgId::new(foreign_id); - let params = Params::from_str(&raw_params).unwrap(); - let blob_path = params - .get_blob(Param::File, &self.ctx, false) - .await - .expect("failed to parse blob from param") - .expect("no Param::File found in Params") - .to_abs_path(); self.ctx .sql .execute("DELETE FROM jobs WHERE id=?;", paramsv![rowid]) .await .expect("failed to remove job"); - update_msg_state(&self.ctx, id, MessageState::OutDelivered) + update_msg_state(&self.ctx, msg_id, MessageState::OutDelivered) .await .expect("failed to update message state"); SentMessage { - params, - blob_path, - sender_msg_id: id, + payload, + sender_msg_id: msg_id, + recipients, } } @@ -347,7 +335,7 @@ impl TestContext { let received_msg = "Received: (Postfix, from userid 1000); Mon, 4 Dec 2006 14:51:39 +0100 (CET)\n" .to_owned() - + &msg.payload(); + + msg.payload(); dc_receive_imf(&self.ctx, received_msg.as_bytes(), "INBOX", false) .await .unwrap(); @@ -592,8 +580,8 @@ impl Drop for LogSink { /// passed through a SMTP-IMAP pipeline. #[derive(Debug, Clone)] pub struct SentMessage { - params: Params, - blob_path: PathBuf, + payload: String, + recipients: String, pub sender_msg_id: MsgId, } @@ -602,17 +590,17 @@ impl SentMessage { /// /// If there are multiple recipients this is just a random one, so is not very useful. pub fn recipient(&self) -> EmailAddress { - let raw = self - .params - .get(Param::Recipients) - .expect("no recipients in params"); - let rcpt = raw.split(' ').next().expect("no recipient found"); + let rcpt = self + .recipients + .split(' ') + .next() + .expect("no recipient found"); rcpt.parse().expect("failed to parse email address") } /// The raw message payload. - pub fn payload(&self) -> String { - std::fs::read_to_string(&self.blob_path).unwrap() + pub fn payload(&self) -> &str { + &self.payload } }