From db0f5ed5a6a5169531f39e8799b845e209813cba Mon Sep 17 00:00:00 2001 From: Hocuri Date: Wed, 16 Mar 2022 10:50:11 +0100 Subject: [PATCH] try to speed up needs_move(), introduce LazyMsg --- benches/receive_emails.rs | 25 ++++-- src/context.rs | 2 + src/dc_receive_imf.rs | 66 +++++++++++++-- src/imap.rs | 1 + src/job.rs | 11 ++- src/message.rs | 169 +++++++++++++++++++++++--------------- 6 files changed, 192 insertions(+), 82 deletions(-) diff --git a/benches/receive_emails.rs b/benches/receive_emails.rs index 18343c3d2..dc28f1700 100644 --- a/benches/receive_emails.rs +++ b/benches/receive_emails.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use async_std::{path::PathBuf, task::block_on}; use criterion::{ async_executor::AsyncStdExecutor, black_box, criterion_group, criterion_main, BatchSize, - Criterion, + BenchmarkId, Criterion, }; use deltachat::{ config::Config, @@ -28,7 +28,8 @@ async fn recv_emails(context: Context, emails: &[&[u8]]) -> Context { context } -async fn recv_all_emails(context: Context) -> Context { +async fn recv_all_emails(mut context: Context, needs_move_enabled: bool) -> Context { + context.disable_needs_move = !needs_move_enabled; let emails = [ include_bytes!("../test-data/message/allinkl-quote.eml").as_ref(), include_bytes!("../test-data/message/apple_cid_jpg.eml").as_ref(), @@ -125,13 +126,21 @@ async fn create_context() -> Context { } fn criterion_benchmark(c: &mut Criterion) { - c.bench_function("Receive many messages", |b| { - b.to_async(AsyncStdExecutor).iter_batched( - || block_on(create_context()), - |context| recv_all_emails(black_box(context)), - BatchSize::LargeInput, + let mut group = c.benchmark_group("from_elem"); + for needs_move_enabled in [false, true] { + group.bench_with_input( + BenchmarkId::new("Receive many messages", needs_move_enabled), + &needs_move_enabled, + |b, needs_move_enabled| { + b.to_async(AsyncStdExecutor).iter_batched( + || block_on(create_context()), + |context| recv_all_emails(black_box(context), *needs_move_enabled), + BatchSize::LargeInput, + ); + }, ); - }); + } + group.finish(); } criterion_group!(benches, criterion_benchmark); diff --git a/src/context.rs b/src/context.rs index ce3ddec38..c4492c923 100644 --- a/src/context.rs +++ b/src/context.rs @@ -30,6 +30,7 @@ use crate::sql::Sql; #[derive(Clone, Debug)] pub struct Context { pub(crate) inner: Arc, + pub disable_needs_move: bool, // TODO just added for some extra profiling info } impl Deref for Context { @@ -157,6 +158,7 @@ impl Context { let ctx = Context { inner: Arc::new(inner), + disable_needs_move: false, }; ctx.sql.open(&ctx, &ctx.dbfile, false).await?; diff --git a/src/dc_receive_imf.rs b/src/dc_receive_imf.rs index c0ab3ef2a..8790a49e1 100644 --- a/src/dc_receive_imf.rs +++ b/src/dc_receive_imf.rs @@ -193,7 +193,7 @@ pub(crate) async fn dc_receive_imf_inner( } // Add parts - let chat_id = add_parts( + let (chat_id, msgs) = add_parts( context, &mut mime_parser, imf_raw, @@ -323,7 +323,11 @@ pub(crate) async fn dc_receive_imf_inner( ) .await?; } - } else if insert_msg_id + } else if msgs + .first() + .unwrap() // TODO unwrap() (should be safe though) + .clone() // TODO unnecessary clone + .into_lazy() .needs_move(context, server_folder.as_ref()) .await .unwrap_or_default() @@ -439,7 +443,7 @@ async fn add_parts( create_event_to_send: &mut Option, fetching_existing_messages: bool, prevent_rename: bool, -) -> Result { +) -> Result<(ChatId, Vec)> { let mut chat_id = None; let mut chat_id_blocked = Blocked::Not; let mut incoming_origin = incoming_origin; @@ -513,7 +517,7 @@ async fn add_parts( } Err(err) => { warn!(context, "Error in Secure-Join message handling: {}", err); - return Ok(DC_CHAT_ID_TRASH); + return Ok((DC_CHAT_ID_TRASH, Vec::new())); } } } else { @@ -740,7 +744,7 @@ async fn add_parts( } Err(err) => { warn!(context, "Error in Secure-Join watching: {}", err); - return Ok(DC_CHAT_ID_TRASH); + return Ok((DC_CHAT_ID_TRASH, Vec::new())); } } } else if mime_parser.sync_items.is_some() && self_sent { @@ -1005,7 +1009,7 @@ async fn add_parts( sort_timestamp, ) .await?; - return Ok(chat_id); // do not return an error as this would result in retrying the message + return Ok((chat_id, Vec::new())); // do not return an error as this would result in retrying the message } } set_better_msg( @@ -1076,6 +1080,7 @@ async fn add_parts( let mut ids = Vec::with_capacity(parts.len()); let conn = context.sql.get_conn().await?; + let mut msgs = Vec::new(); for part in &mut parts { let mut txt_raw = "".to_string(); @@ -1133,6 +1138,53 @@ INSERT INTO msgs // also change `MsgId::trash()` and `delete_expired_messages()` let trash = chat_id.is_trash(); + let msg = Message { + id: MsgId::new(0), + rfc724_mid: rfc724_mid.to_string(), + server_uid: server_uid, + chat_id, + from_id: if trash { 0 } else { from_id }, + to_id: if trash { 0 } else { to_id }, + timestamp_sort: sort_timestamp, + timestamp_sent: sent_timestamp, + timestamp_rcvd: rcvd_timestamp, + viewtype: part.typ, + state, + is_dc_message, + text: if trash { + None + } else { + Some(part.msg.to_string()) + }, + subject: if trash { + "".to_string() + } else { + subject.to_string() + }, + param: if trash { + Params::new() + } else { + part.param.clone() + }, + in_reply_to: Some(mime_in_reply_to.to_string()), // TODO be careful with Some("") and None + mime_modified, + error: part.error.clone(), + ephemeral_timer, + ephemeral_timestamp, + download_state: if is_partial_download.is_some() { + DownloadState::Available + } else { + DownloadState::Done + }, + hidden: false, + chat_blocked: chat_id_blocked, // TODO not sure if correct + location_id: 0, + server_folder: Some(server_folder.to_string()), + }; + // TODO mabye reuse some of the fields from above in the query below to avoid duplicate code + + msgs.push(msg); + stmt.execute(paramsv![ rfc724_mid, server_folder, @@ -1223,7 +1275,7 @@ INSERT INTO msgs } } - Ok(chat_id) + Ok((chat_id, msgs)) } /// Saves attached locations to the database. diff --git a/src/imap.rs b/src/imap.rs index 79343e28e..3a26a17e9 100644 --- a/src/imap.rs +++ b/src/imap.rs @@ -1598,6 +1598,7 @@ async fn precheck_imf( if delete_server_after != Some(0) { if msg_id + .to_lazy() .needs_move(context, server_folder) .await .unwrap_or_default() diff --git a/src/job.rs b/src/job.rs index 0e579813e..94a87dc66 100644 --- a/src/job.rs +++ b/src/job.rs @@ -561,11 +561,16 @@ impl Job { } let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); - let server_folder = &job_try!(msg + let server_folder = job_try!(msg .server_folder + .as_ref() .context("Can't move message out of folder if we don't know the current folder")); - let move_res = msg.id.needs_move(context, server_folder).await; + let move_res = msg + .clone() // TODO avoid clone()? + .into_lazy() + .needs_move(context, &server_folder) + .await; let dest_folder = match move_res { Err(e) => { warn!(context, "could not load dest folder: {}", e); @@ -589,7 +594,7 @@ impl Job { if let Some(dest_folder) = dest_folder { match imap - .mv(context, server_folder, msg.server_uid, &dest_folder) + .mv(context, &server_folder, msg.server_uid, &dest_folder) .await { ImapActionResult::RetryLater => Status::RetryLater, diff --git a/src/message.rs b/src/message.rs index 46a9cbe9f..4952d3ba5 100644 --- a/src/message.rs +++ b/src/message.rs @@ -6,6 +6,7 @@ use std::convert::TryInto; use anyhow::{ensure, format_err, Context as _, Result}; use async_std::path::{Path, PathBuf}; use deltachat_derive::{FromSql, ToSql}; +use once_cell::sync::OnceCell; use rusqlite::types::ValueRef; use serde::{Deserialize, Serialize}; @@ -53,6 +54,13 @@ impl MsgId { MsgId(0) } + pub fn to_lazy(&self) -> LazyMsg { + LazyMsg { + id: *self, + message: OnceCell::new(), + } + } + /// Whether the message ID signifies a special message. /// /// This kind of message ID can not be used for real messages. @@ -83,65 +91,6 @@ impl MsgId { Ok(result) } - /// Returns Some if the message needs to be moved from `folder`. - /// If yes, returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder`, - /// depending on where the message should be moved - pub async fn needs_move(self, context: &Context, folder: &str) -> Result> { - use Config::*; - if context.is_mvbox(folder).await? { - return Ok(None); - } - - let msg = Message::load_from_db(context, self).await?; - - if context.is_spam_folder(folder).await? { - let msg_unblocked = msg.chat_id != DC_CHAT_ID_TRASH && msg.chat_blocked == Blocked::Not; - - return if msg_unblocked { - if self.needs_move_to_mvbox(context, &msg).await? { - Ok(Some(ConfiguredMvboxFolder)) - } else { - Ok(Some(ConfiguredInboxFolder)) - } - } else { - // Blocked or contact request message in the spam folder, leave it there - Ok(None) - }; - } - - if self.needs_move_to_mvbox(context, &msg).await? { - Ok(Some(ConfiguredMvboxFolder)) - } else if msg.state.is_outgoing() - && msg.is_dc_message == MessengerMessage::Yes - && !msg.is_setupmessage() - && msg.to_id != DC_CONTACT_ID_SELF // Leave self-chat-messages in the inbox, not sure about this - && context.is_inbox(folder).await? - && context.get_config_bool(SentboxMove).await? - && context.get_config(ConfiguredSentboxFolder).await?.is_some() - { - Ok(Some(ConfiguredSentboxFolder)) - } else { - Ok(None) - } - } - - async fn needs_move_to_mvbox(self, context: &Context, msg: &Message) -> Result { - if !context.get_config_bool(Config::MvboxMove).await? { - return Ok(false); - } - - if msg.is_setupmessage() { - // do not move setup messages; - // there may be a non-delta device that wants to handle it - return Ok(false); - } - - match msg.is_dc_message { - MessengerMessage::No => Ok(false), - MessengerMessage::Yes | MessengerMessage::Reply => Ok(true), - } - } - /// Put message into trash chat and delete message text. /// /// It means the message is deleted locally, but not on the server. @@ -282,6 +231,90 @@ impl Default for MessengerMessage { } } +#[derive(Debug)] +pub struct LazyMsg { + id: MsgId, + message: OnceCell, +} + +impl LazyMsg { + // TODO should get() require &mut self? Or lock a mutex during computation? + async fn get(&self, context: &Context) -> Result<&Message> { + if let Some(m) = self.message.get() { + Ok(m) + } else { + let m = Message::load_from_db(context, self.id).await?; + self.message.set(m).ok(); + self.message.get().context("message.set() failed???") + } + } + + /// Returns Some if the message needs to be moved from `folder`. + /// If yes, returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder`, + /// depending on where the message should be moved + pub async fn needs_move(&self, context: &Context, folder: &str) -> Result> { + use Config::*; + if context.disable_needs_move { + return Ok(None); + } + + if context.is_mvbox(folder).await? { + return Ok(None); + } + + let msg = self.get(context).await?; + + if context.is_spam_folder(folder).await? { + let msg_unblocked = msg.chat_id != DC_CHAT_ID_TRASH && msg.chat_blocked == Blocked::Not; + + return if msg_unblocked { + if self.needs_move_to_mvbox(context).await? { + Ok(Some(ConfiguredMvboxFolder)) + } else { + Ok(Some(ConfiguredInboxFolder)) + } + } else { + // Blocked or contact request message in the spam folder, leave it there + Ok(None) + }; + } + + if self.needs_move_to_mvbox(context).await? { + Ok(Some(ConfiguredMvboxFolder)) + } else if msg.state.is_outgoing() + && msg.is_dc_message == MessengerMessage::Yes + && !msg.is_setupmessage() + && msg.to_id != DC_CONTACT_ID_SELF // Leave self-chat-messages in the inbox, not sure about this + && context.is_inbox(folder).await? + && context.get_config_bool(SentboxMove).await? + && context.get_config(ConfiguredSentboxFolder).await?.is_some() + { + Ok(Some(ConfiguredSentboxFolder)) + } else { + Ok(None) + } + } + + async fn needs_move_to_mvbox(&self, context: &Context) -> Result { + if !context.get_config_bool(Config::MvboxMove).await? { + return Ok(false); + } + + let msg = self.get(context).await?; + + if msg.is_setupmessage() { + // do not move setup messages; + // there may be a non-delta device that wants to handle it + return Ok(false); + } + + match msg.is_dc_message { + MessengerMessage::No => Ok(false), + MessengerMessage::Yes | MessengerMessage::Reply => Ok(true), + } + } +} + /// An object representing a single message in memory. /// The message object is not updated. /// If you want an update, you have to recreate the object. @@ -426,6 +459,13 @@ impl Message { Ok(msg) } + pub fn into_lazy(self) -> LazyMsg { + LazyMsg { + id: self.id, + message: self.into(), + } + } + pub fn get_filemime(&self) -> Option { if let Some(m) = self.param.get(Param::MimeType) { return Some(m.to_string()); @@ -1990,11 +2030,12 @@ mod tests { let exists = rfc724_mid_exists(&t, "abc@example.com").await.unwrap(); let (folder_1, _, msg_id) = exists.unwrap(); assert_eq!(folder, folder_1); - let actual = if let Some(config) = msg_id.needs_move(&t.ctx, folder).await.unwrap() { - t.ctx.get_config(config).await.unwrap() - } else { - None - }; + let actual = + if let Some(config) = msg_id.to_lazy().needs_move(&t.ctx, folder).await.unwrap() { + t.ctx.get_config(config).await.unwrap() + } else { + None + }; let expected = if expected_destination == folder { None } else {