diff --git a/src/chat.rs b/src/chat.rs index 8db0ee601..4a3f8f9b4 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -19,8 +19,8 @@ use crate::chatlist::Chatlist; use crate::color::str_to_color; use crate::config::Config; use crate::constants::{ - Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, DC_CHAT_ID_LAST_SPECIAL, - DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, + self, Blocked, Chattype, DC_CHAT_ID_ALLDONE_HINT, DC_CHAT_ID_ARCHIVED_LINK, + DC_CHAT_ID_LAST_SPECIAL, DC_CHAT_ID_TRASH, DC_RESEND_USER_AVATAR_DAYS, }; use crate::contact::{self, Contact, ContactAddress, ContactId, Origin}; use crate::context::Context; @@ -2662,17 +2662,20 @@ pub async fn send_msg(context: &Context, chat_id: ChatId, msg: &mut Message) -> /// Tries to send a message synchronously. /// -/// Creates a new message in `smtp` table, then drectly opens an SMTP connection and sends the -/// message. If this fails, the message remains in the database to be sent later. +/// Creates jobs in the `smtp` table, then drectly opens an SMTP connection and sends the +/// message. If this fails, the jobs remain in the database for later sending. pub async fn send_msg_sync(context: &Context, chat_id: ChatId, msg: &mut Message) -> Result { - if let Some(rowid) = prepare_send_msg(context, chat_id, msg).await? { - let mut smtp = crate::smtp::Smtp::new(); + let rowids = prepare_send_msg(context, chat_id, msg).await?; + if rowids.is_empty() { + return Ok(msg.id); + } + let mut smtp = crate::smtp::Smtp::new(); + for rowid in rowids { send_msg_to_smtp(context, &mut smtp, rowid) .await .context("failed to send message, queued for later sending")?; - - context.emit_msgs_changed(msg.chat_id, msg.id); } + context.emit_msgs_changed(msg.chat_id, msg.id); Ok(msg.id) } @@ -2682,7 +2685,7 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - msg.text = strip_rtlo_characters(&msg.text); } - if prepare_send_msg(context, chat_id, msg).await?.is_some() { + if !prepare_send_msg(context, chat_id, msg).await?.is_empty() { context.emit_msgs_changed(msg.chat_id, msg.id); if msg.param.exists(Param::SetLatitude) { @@ -2695,12 +2698,12 @@ async fn send_msg_inner(context: &Context, chat_id: ChatId, msg: &mut Message) - Ok(msg.id) } -/// Returns rowid from `smtp` table. +/// Returns row ids of the `smtp` table. async fn prepare_send_msg( context: &Context, chat_id: ChatId, msg: &mut Message, -) -> Result> { +) -> Result> { // prepare_msg() leaves the message state to OutPreparing, we // only have to change the state to OutPending in this case. // Otherwise we still have to prepare the message, which will set @@ -2716,20 +2719,16 @@ async fn prepare_send_msg( ); message::update_msg_state(context, msg.id, MessageState::OutPending).await?; } - let row_id = create_send_msg_job(context, msg).await?; - Ok(row_id) + create_send_msg_jobs(context, msg).await } -/// Constructs a job for sending a message and inserts into `smtp` table. +/// Constructs jobs for sending a message and inserts them into the `smtp` table. /// -/// Returns rowid if job was created or `None` if SMTP job is not needed, e.g. when sending to a +/// Returns row ids if jobs were created or an empty `Vec` otherwise, e.g. when sending to a /// group with only self and no BCC-to-self configured. /// -/// The caller has to interrupt SMTP loop or otherwise process a new row. -pub(crate) async fn create_send_msg_job( - context: &Context, - msg: &mut Message, -) -> Result> { +/// The caller has to interrupt SMTP loop or otherwise process new rows. +pub(crate) async fn create_send_msg_jobs(context: &Context, msg: &mut Message) -> Result> { let needs_encryption = msg.param.get_bool(Param::GuaranteeE2ee).unwrap_or_default(); let attach_selfavatar = match shall_attach_selfavatar(context, msg.chat_id).await { @@ -2748,7 +2747,7 @@ pub(crate) async fn create_send_msg_job( let lowercase_from = from.to_lowercase(); // Send BCC to self if it is enabled and we are not going to - // delete it immediately. + // delete it immediately. `from` must be the last addr, see `receive_imf_inner()` why. if context.get_config_bool(Config::BccSelf).await? && context.get_config_delete_server_after().await? != Some(0) && !recipients @@ -2766,7 +2765,7 @@ pub(crate) async fn create_send_msg_job( ); msg.id.set_delivered(context).await?; msg.state = MessageState::OutDelivered; - return Ok(None); + return Ok(Vec::new()); } let rendered_msg = match mimefactory.render(context).await { @@ -2826,27 +2825,32 @@ pub(crate) async fn create_send_msg_job( msg.update_param(context).await?; } - ensure!(!recipients.is_empty(), "no recipients for smtp job set"); - - let recipients = recipients.join(" "); - msg.subject = rendered_msg.subject.clone(); msg.update_subject(context).await?; - - let row_id = context - .sql - .insert( - "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) - VALUES (?1, ?2, ?3, ?4)", - ( - &rendered_msg.rfc724_mid, - recipients, - &rendered_msg.message, - msg.id, - ), - ) - .await?; - Ok(Some(row_id)) + let chunk_size = context + .get_configured_provider() + .await? + .and_then(|provider| provider.opt.max_smtp_rcpt_to) + .map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from); + let trans_fn = |t: &mut rusqlite::Transaction| { + let mut row_ids = Vec::::new(); + for recipients_chunk in recipients.chunks(chunk_size) { + let recipients_chunk = recipients_chunk.join(" "); + let row_id = t.execute( + "INSERT INTO smtp (rfc724_mid, recipients, mime, msg_id) \ + VALUES (?1, ?2, ?3, ?4)", + ( + &rendered_msg.rfc724_mid, + recipients_chunk, + &rendered_msg.message, + msg.id, + ), + )?; + row_ids.push(row_id.try_into()?); + } + Ok(row_ids) + }; + context.sql.transaction(trans_fn).await } /// Sends a text message to the given chat. @@ -4002,7 +4006,7 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) .prepare_msg_raw(context, &mut msg, None, curr_timestamp) .await?; curr_timestamp += 1; - if create_send_msg_job(context, &mut msg).await?.is_some() { + if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { context.scheduler.interrupt_smtp().await; } } @@ -4059,7 +4063,7 @@ pub async fn resend_msgs(context: &Context, msg_ids: &[MsgId]) -> Result<()> { chat_id: msg.chat_id, msg_id: msg.id, }); - if create_send_msg_job(context, &mut msg).await?.is_some() { + if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { context.scheduler.interrupt_smtp().await; } } diff --git a/src/constants.rs b/src/constants.rs index 165de5391..ac3c281ed 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -209,6 +209,11 @@ pub const WORSE_IMAGE_SIZE: u32 = 640; // this value can be increased if the folder configuration is changed and must be redone on next program start pub(crate) const DC_FOLDERS_CONFIGURED_VERSION: i32 = 4; +// If more recipients are needed in SMTP's `RCPT TO:` header, the recipient list is split into +// chunks. This does not affect MIME's `To:` header. Can be overwritten by setting +// `max_smtp_rcpt_to` in the provider db. +pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50; + #[cfg(test)] mod tests { use num_traits::FromPrimitive; diff --git a/src/message.rs b/src/message.rs index e31fb554d..58eef2bdd 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1665,7 +1665,10 @@ pub(crate) async fn update_msg_state( ) -> Result<()> { context .sql - .execute("UPDATE msgs SET state=? WHERE id=?;", (state, msg_id)) + .execute( + "UPDATE msgs SET state=?1 WHERE id=?2 AND (?1!=?3 OR state Result> { + if delivered { + msg_id.set_delivered(context).await?; + } + Ok(None) } // check, if the mail is already in our database. @@ -210,7 +234,7 @@ pub(crate) async fn receive_imf_inner( context, "Got a partial download and message is already in DB." ); - return Ok(None); + return on_msg_in_db(context, old_msg_id, delivered).await; } let msg = Message::load_from_db(context, old_msg_id).await?; replace_msg_id = Some(old_msg_id); @@ -232,9 +256,12 @@ pub(crate) async fn receive_imf_inner( }; replace_chat_id = None; } - if replace_msg_id.is_some() && replace_chat_id.is_none() { + + if replace_chat_id.is_some() { + // Need to update chat id in the db. + } else if let Some(msg_id) = replace_msg_id { info!(context, "Message is already downloaded."); - return Ok(None); + return on_msg_in_db(context, msg_id, delivered).await; }; let prevent_rename = diff --git a/src/smtp.rs b/src/smtp.rs index b687c3c40..7ec94c467 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -595,7 +595,13 @@ pub(crate) async fn send_msg_to_smtp( match status { SendResult::Retry => Err(format_err!("Retry")), SendResult::Success => { - msg_id.set_delivered(context).await?; + if !context + .sql + .exists("SELECT COUNT(*) FROM smtp WHERE msg_id=?", (msg_id,)) + .await? + { + msg_id.set_delivered(context).await?; + } Ok(()) } SendResult::Failure(err) => Err(format_err!("{}", err)), diff --git a/src/smtp/send.rs b/src/smtp/send.rs index c66e3f844..7adb59765 100644 --- a/src/smtp/send.rs +++ b/src/smtp/send.rs @@ -9,11 +9,6 @@ use crate::events::EventType; pub type Result = std::result::Result; -// if more recipients are needed in SMTP's `RCPT TO:` header, recipient-list is split to chunks. -// this does not affect MIME'e `To:` header. -// can be overwritten by the setting `max_smtp_rcpt_to` in provider-db. -pub(crate) const DEFAULT_MAX_SMTP_RCPT_TO: usize = 50; - #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Envelope error: {}", _0)] @@ -43,40 +38,30 @@ impl Smtp { } let message_len_bytes = message.len(); + let recipients_display = recipients + .iter() + .map(|x| x.as_ref()) + .collect::>() + .join(","); - let chunk_size = context - .get_configured_provider() - .await? - .and_then(|provider| provider.opt.max_smtp_rcpt_to) - .map_or(DEFAULT_MAX_SMTP_RCPT_TO, usize::from); + let envelope = + Envelope::new(self.from.clone(), recipients.to_vec()).map_err(Error::Envelope)?; + let mail = SendableEmail::new(envelope, message); - for recipients_chunk in recipients.chunks(chunk_size) { - let recipients_display = recipients_chunk - .iter() - .map(|x| x.as_ref()) - .collect::>() - .join(","); + if let Some(ref mut transport) = self.transport { + transport.send(mail).await.map_err(Error::SmtpSend)?; - let envelope = Envelope::new(self.from.clone(), recipients_chunk.to_vec()) - .map_err(Error::Envelope)?; - let mail = SendableEmail::new(envelope, message); - - if let Some(ref mut transport) = self.transport { - transport.send(mail).await.map_err(Error::SmtpSend)?; - - let info_msg = format!( - "Message len={message_len_bytes} was SMTP-sent to {recipients_display}" - ); - info!(context, "{info_msg}."); - context.emit_event(EventType::SmtpMessageSent(info_msg)); - self.last_success = Some(std::time::SystemTime::now()); - } else { - warn!( - context, - "uh? SMTP has no transport, failed to send to {}", recipients_display - ); - return Err(Error::NoTransport); - } + let info_msg = + format!("Message len={message_len_bytes} was SMTP-sent to {recipients_display}"); + info!(context, "{info_msg}."); + context.emit_event(EventType::SmtpMessageSent(info_msg)); + self.last_success = Some(std::time::SystemTime::now()); + } else { + warn!( + context, + "uh? SMTP has no transport, failed to send to {}", recipients_display + ); + return Err(Error::NoTransport); } Ok(()) } diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index c0ab7074c..658a243b9 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -1,11 +1,13 @@ //! Migrations module. use anyhow::{Context as _, Result}; +use rusqlite::OptionalExtension; use crate::config::Config; -use crate::constants::ShowEmails; +use crate::constants::{self, ShowEmails}; use crate::context::Context; use crate::imap; +use crate::message::MsgId; use crate::provider::get_provider_by_domain; use crate::sql::Sql; use crate::tools::EmailAddress; @@ -834,6 +836,56 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid); .await?; } + if dbversion < 108 { + let version = 108; + let chunk_size = context + .get_configured_provider() + .await? + .and_then(|provider| provider.opt.max_smtp_rcpt_to) + .map_or(constants::DEFAULT_MAX_SMTP_RCPT_TO, usize::from); + sql.transaction(move |trans| { + Sql::set_db_version_trans(trans, version)?; + let id_max = + trans.query_row("SELECT IFNULL((SELECT MAX(id) FROM smtp), 0)", (), |row| { + let id_max: i64 = row.get(0)?; + Ok(id_max) + })?; + while let Some((id, rfc724_mid, mime, msg_id, recipients, retries)) = trans + .query_row( + "SELECT id, rfc724_mid, mime, msg_id, recipients, retries FROM smtp \ + WHERE id<=? LIMIT 1", + (id_max,), + |row| { + let id: i64 = row.get(0)?; + let rfc724_mid: String = row.get(1)?; + let mime: String = row.get(2)?; + let msg_id: MsgId = row.get(3)?; + let recipients: String = row.get(4)?; + let retries: i64 = row.get(5)?; + Ok((id, rfc724_mid, mime, msg_id, recipients, retries)) + }, + ) + .optional()? + { + trans.execute("DELETE FROM smtp WHERE id=?", (id,))?; + let recipients = recipients.split(' ').collect::>(); + for recipients in recipients.chunks(chunk_size) { + let recipients = recipients.join(" "); + trans.execute( + "INSERT INTO smtp (rfc724_mid, mime, msg_id, recipients, retries) \ + VALUES (?, ?, ?, ?, ?)", + (&rfc724_mid, &mime, msg_id, recipients, retries), + )?; + } + } + Ok(()) + }) + .await + .with_context(|| format!("migration failed for version {version}"))?; + + sql.set_db_version_in_cache(version).await?; + } + let new_version = sql .get_raw_config_int(VERSION_CFG) .await? @@ -873,6 +925,12 @@ impl Sql { Ok(()) } + async fn set_db_version_in_cache(&self, version: i32) -> Result<()> { + let mut lock = self.config_cache.write().await; + lock.insert(VERSION_CFG.to_string(), Some(format!("{version}"))); + Ok(()) + } + async fn execute_migration(&self, query: &str, version: i32) -> Result<()> { self.transaction(move |transaction| { Self::set_db_version_trans(transaction, version)?; @@ -883,10 +941,6 @@ impl Sql { .await .with_context(|| format!("execute_migration failed for version {version}"))?; - let mut lock = self.config_cache.write().await; - lock.insert(VERSION_CFG.to_string(), Some(format!("{version}"))); - drop(lock); - - Ok(()) + self.set_db_version_in_cache(version).await } }