try to speed up needs_move(), introduce LazyMsg

This commit is contained in:
Hocuri
2022-03-16 10:50:11 +01:00
parent ac51939c08
commit db0f5ed5a6
6 changed files with 192 additions and 82 deletions

View File

@@ -3,7 +3,7 @@ use std::convert::TryInto;
use async_std::{path::PathBuf, task::block_on}; use async_std::{path::PathBuf, task::block_on};
use criterion::{ use criterion::{
async_executor::AsyncStdExecutor, black_box, criterion_group, criterion_main, BatchSize, async_executor::AsyncStdExecutor, black_box, criterion_group, criterion_main, BatchSize,
Criterion, BenchmarkId, Criterion,
}; };
use deltachat::{ use deltachat::{
config::Config, config::Config,
@@ -28,7 +28,8 @@ async fn recv_emails(context: Context, emails: &[&[u8]]) -> Context {
context context
} }
async fn recv_all_emails(context: Context) -> Context { async fn recv_all_emails(mut context: Context, needs_move_enabled: bool) -> Context {
context.disable_needs_move = !needs_move_enabled;
let emails = [ let emails = [
include_bytes!("../test-data/message/allinkl-quote.eml").as_ref(), include_bytes!("../test-data/message/allinkl-quote.eml").as_ref(),
include_bytes!("../test-data/message/apple_cid_jpg.eml").as_ref(), include_bytes!("../test-data/message/apple_cid_jpg.eml").as_ref(),
@@ -125,13 +126,21 @@ async fn create_context() -> Context {
} }
fn criterion_benchmark(c: &mut Criterion) { fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("Receive many messages", |b| { let mut group = c.benchmark_group("from_elem");
b.to_async(AsyncStdExecutor).iter_batched( for needs_move_enabled in [false, true] {
|| block_on(create_context()), group.bench_with_input(
|context| recv_all_emails(black_box(context)), BenchmarkId::new("Receive many messages", needs_move_enabled),
BatchSize::LargeInput, &needs_move_enabled,
|b, needs_move_enabled| {
b.to_async(AsyncStdExecutor).iter_batched(
|| block_on(create_context()),
|context| recv_all_emails(black_box(context), *needs_move_enabled),
BatchSize::LargeInput,
);
},
); );
}); }
group.finish();
} }
criterion_group!(benches, criterion_benchmark); criterion_group!(benches, criterion_benchmark);

View File

@@ -30,6 +30,7 @@ use crate::sql::Sql;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Context { pub struct Context {
pub(crate) inner: Arc<InnerContext>, pub(crate) inner: Arc<InnerContext>,
pub disable_needs_move: bool, // TODO just added for some extra profiling info
} }
impl Deref for Context { impl Deref for Context {
@@ -157,6 +158,7 @@ impl Context {
let ctx = Context { let ctx = Context {
inner: Arc::new(inner), inner: Arc::new(inner),
disable_needs_move: false,
}; };
ctx.sql.open(&ctx, &ctx.dbfile, false).await?; ctx.sql.open(&ctx, &ctx.dbfile, false).await?;

View File

@@ -193,7 +193,7 @@ pub(crate) async fn dc_receive_imf_inner(
} }
// Add parts // Add parts
let chat_id = add_parts( let (chat_id, msgs) = add_parts(
context, context,
&mut mime_parser, &mut mime_parser,
imf_raw, imf_raw,
@@ -323,7 +323,11 @@ pub(crate) async fn dc_receive_imf_inner(
) )
.await?; .await?;
} }
} else if insert_msg_id } else if msgs
.first()
.unwrap() // TODO unwrap() (should be safe though)
.clone() // TODO unnecessary clone
.into_lazy()
.needs_move(context, server_folder.as_ref()) .needs_move(context, server_folder.as_ref())
.await .await
.unwrap_or_default() .unwrap_or_default()
@@ -439,7 +443,7 @@ async fn add_parts(
create_event_to_send: &mut Option<CreateEvent>, create_event_to_send: &mut Option<CreateEvent>,
fetching_existing_messages: bool, fetching_existing_messages: bool,
prevent_rename: bool, prevent_rename: bool,
) -> Result<ChatId> { ) -> Result<(ChatId, Vec<Message>)> {
let mut chat_id = None; let mut chat_id = None;
let mut chat_id_blocked = Blocked::Not; let mut chat_id_blocked = Blocked::Not;
let mut incoming_origin = incoming_origin; let mut incoming_origin = incoming_origin;
@@ -513,7 +517,7 @@ async fn add_parts(
} }
Err(err) => { Err(err) => {
warn!(context, "Error in Secure-Join message handling: {}", err); warn!(context, "Error in Secure-Join message handling: {}", err);
return Ok(DC_CHAT_ID_TRASH); return Ok((DC_CHAT_ID_TRASH, Vec::new()));
} }
} }
} else { } else {
@@ -740,7 +744,7 @@ async fn add_parts(
} }
Err(err) => { Err(err) => {
warn!(context, "Error in Secure-Join watching: {}", err); warn!(context, "Error in Secure-Join watching: {}", err);
return Ok(DC_CHAT_ID_TRASH); return Ok((DC_CHAT_ID_TRASH, Vec::new()));
} }
} }
} else if mime_parser.sync_items.is_some() && self_sent { } else if mime_parser.sync_items.is_some() && self_sent {
@@ -1005,7 +1009,7 @@ async fn add_parts(
sort_timestamp, sort_timestamp,
) )
.await?; .await?;
return Ok(chat_id); // do not return an error as this would result in retrying the message return Ok((chat_id, Vec::new())); // do not return an error as this would result in retrying the message
} }
} }
set_better_msg( set_better_msg(
@@ -1076,6 +1080,7 @@ async fn add_parts(
let mut ids = Vec::with_capacity(parts.len()); let mut ids = Vec::with_capacity(parts.len());
let conn = context.sql.get_conn().await?; let conn = context.sql.get_conn().await?;
let mut msgs = Vec::new();
for part in &mut parts { for part in &mut parts {
let mut txt_raw = "".to_string(); let mut txt_raw = "".to_string();
@@ -1133,6 +1138,53 @@ INSERT INTO msgs
// also change `MsgId::trash()` and `delete_expired_messages()` // also change `MsgId::trash()` and `delete_expired_messages()`
let trash = chat_id.is_trash(); let trash = chat_id.is_trash();
let msg = Message {
id: MsgId::new(0),
rfc724_mid: rfc724_mid.to_string(),
server_uid: server_uid,
chat_id,
from_id: if trash { 0 } else { from_id },
to_id: if trash { 0 } else { to_id },
timestamp_sort: sort_timestamp,
timestamp_sent: sent_timestamp,
timestamp_rcvd: rcvd_timestamp,
viewtype: part.typ,
state,
is_dc_message,
text: if trash {
None
} else {
Some(part.msg.to_string())
},
subject: if trash {
"".to_string()
} else {
subject.to_string()
},
param: if trash {
Params::new()
} else {
part.param.clone()
},
in_reply_to: Some(mime_in_reply_to.to_string()), // TODO be careful with Some("") and None
mime_modified,
error: part.error.clone(),
ephemeral_timer,
ephemeral_timestamp,
download_state: if is_partial_download.is_some() {
DownloadState::Available
} else {
DownloadState::Done
},
hidden: false,
chat_blocked: chat_id_blocked, // TODO not sure if correct
location_id: 0,
server_folder: Some(server_folder.to_string()),
};
// TODO mabye reuse some of the fields from above in the query below to avoid duplicate code
msgs.push(msg);
stmt.execute(paramsv![ stmt.execute(paramsv![
rfc724_mid, rfc724_mid,
server_folder, server_folder,
@@ -1223,7 +1275,7 @@ INSERT INTO msgs
} }
} }
Ok(chat_id) Ok((chat_id, msgs))
} }
/// Saves attached locations to the database. /// Saves attached locations to the database.

View File

@@ -1598,6 +1598,7 @@ async fn precheck_imf(
if delete_server_after != Some(0) { if delete_server_after != Some(0) {
if msg_id if msg_id
.to_lazy()
.needs_move(context, server_folder) .needs_move(context, server_folder)
.await .await
.unwrap_or_default() .unwrap_or_default()

View File

@@ -561,11 +561,16 @@ impl Job {
} }
let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await); let msg = job_try!(Message::load_from_db(context, MsgId::new(self.foreign_id)).await);
let server_folder = &job_try!(msg let server_folder = job_try!(msg
.server_folder .server_folder
.as_ref()
.context("Can't move message out of folder if we don't know the current folder")); .context("Can't move message out of folder if we don't know the current folder"));
let move_res = msg.id.needs_move(context, server_folder).await; let move_res = msg
.clone() // TODO avoid clone()?
.into_lazy()
.needs_move(context, &server_folder)
.await;
let dest_folder = match move_res { let dest_folder = match move_res {
Err(e) => { Err(e) => {
warn!(context, "could not load dest folder: {}", e); warn!(context, "could not load dest folder: {}", e);
@@ -589,7 +594,7 @@ impl Job {
if let Some(dest_folder) = dest_folder { if let Some(dest_folder) = dest_folder {
match imap match imap
.mv(context, server_folder, msg.server_uid, &dest_folder) .mv(context, &server_folder, msg.server_uid, &dest_folder)
.await .await
{ {
ImapActionResult::RetryLater => Status::RetryLater, ImapActionResult::RetryLater => Status::RetryLater,

View File

@@ -6,6 +6,7 @@ use std::convert::TryInto;
use anyhow::{ensure, format_err, Context as _, Result}; use anyhow::{ensure, format_err, Context as _, Result};
use async_std::path::{Path, PathBuf}; use async_std::path::{Path, PathBuf};
use deltachat_derive::{FromSql, ToSql}; use deltachat_derive::{FromSql, ToSql};
use once_cell::sync::OnceCell;
use rusqlite::types::ValueRef; use rusqlite::types::ValueRef;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -53,6 +54,13 @@ impl MsgId {
MsgId(0) MsgId(0)
} }
pub fn to_lazy(&self) -> LazyMsg {
LazyMsg {
id: *self,
message: OnceCell::new(),
}
}
/// Whether the message ID signifies a special message. /// Whether the message ID signifies a special message.
/// ///
/// This kind of message ID can not be used for real messages. /// This kind of message ID can not be used for real messages.
@@ -83,65 +91,6 @@ impl MsgId {
Ok(result) Ok(result)
} }
/// Returns Some if the message needs to be moved from `folder`.
/// If yes, returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder`,
/// depending on where the message should be moved
pub async fn needs_move(self, context: &Context, folder: &str) -> Result<Option<Config>> {
use Config::*;
if context.is_mvbox(folder).await? {
return Ok(None);
}
let msg = Message::load_from_db(context, self).await?;
if context.is_spam_folder(folder).await? {
let msg_unblocked = msg.chat_id != DC_CHAT_ID_TRASH && msg.chat_blocked == Blocked::Not;
return if msg_unblocked {
if self.needs_move_to_mvbox(context, &msg).await? {
Ok(Some(ConfiguredMvboxFolder))
} else {
Ok(Some(ConfiguredInboxFolder))
}
} else {
// Blocked or contact request message in the spam folder, leave it there
Ok(None)
};
}
if self.needs_move_to_mvbox(context, &msg).await? {
Ok(Some(ConfiguredMvboxFolder))
} else if msg.state.is_outgoing()
&& msg.is_dc_message == MessengerMessage::Yes
&& !msg.is_setupmessage()
&& msg.to_id != DC_CONTACT_ID_SELF // Leave self-chat-messages in the inbox, not sure about this
&& context.is_inbox(folder).await?
&& context.get_config_bool(SentboxMove).await?
&& context.get_config(ConfiguredSentboxFolder).await?.is_some()
{
Ok(Some(ConfiguredSentboxFolder))
} else {
Ok(None)
}
}
async fn needs_move_to_mvbox(self, context: &Context, msg: &Message) -> Result<bool> {
if !context.get_config_bool(Config::MvboxMove).await? {
return Ok(false);
}
if msg.is_setupmessage() {
// do not move setup messages;
// there may be a non-delta device that wants to handle it
return Ok(false);
}
match msg.is_dc_message {
MessengerMessage::No => Ok(false),
MessengerMessage::Yes | MessengerMessage::Reply => Ok(true),
}
}
/// Put message into trash chat and delete message text. /// Put message into trash chat and delete message text.
/// ///
/// It means the message is deleted locally, but not on the server. /// It means the message is deleted locally, but not on the server.
@@ -282,6 +231,90 @@ impl Default for MessengerMessage {
} }
} }
#[derive(Debug)]
pub struct LazyMsg {
id: MsgId,
message: OnceCell<Message>,
}
impl LazyMsg {
// TODO should get() require &mut self? Or lock a mutex during computation?
async fn get(&self, context: &Context) -> Result<&Message> {
if let Some(m) = self.message.get() {
Ok(m)
} else {
let m = Message::load_from_db(context, self.id).await?;
self.message.set(m).ok();
self.message.get().context("message.set() failed???")
}
}
/// Returns Some if the message needs to be moved from `folder`.
/// If yes, returns `ConfiguredInboxFolder`, `ConfiguredMvboxFolder` or `ConfiguredSentboxFolder`,
/// depending on where the message should be moved
pub async fn needs_move(&self, context: &Context, folder: &str) -> Result<Option<Config>> {
use Config::*;
if context.disable_needs_move {
return Ok(None);
}
if context.is_mvbox(folder).await? {
return Ok(None);
}
let msg = self.get(context).await?;
if context.is_spam_folder(folder).await? {
let msg_unblocked = msg.chat_id != DC_CHAT_ID_TRASH && msg.chat_blocked == Blocked::Not;
return if msg_unblocked {
if self.needs_move_to_mvbox(context).await? {
Ok(Some(ConfiguredMvboxFolder))
} else {
Ok(Some(ConfiguredInboxFolder))
}
} else {
// Blocked or contact request message in the spam folder, leave it there
Ok(None)
};
}
if self.needs_move_to_mvbox(context).await? {
Ok(Some(ConfiguredMvboxFolder))
} else if msg.state.is_outgoing()
&& msg.is_dc_message == MessengerMessage::Yes
&& !msg.is_setupmessage()
&& msg.to_id != DC_CONTACT_ID_SELF // Leave self-chat-messages in the inbox, not sure about this
&& context.is_inbox(folder).await?
&& context.get_config_bool(SentboxMove).await?
&& context.get_config(ConfiguredSentboxFolder).await?.is_some()
{
Ok(Some(ConfiguredSentboxFolder))
} else {
Ok(None)
}
}
async fn needs_move_to_mvbox(&self, context: &Context) -> Result<bool> {
if !context.get_config_bool(Config::MvboxMove).await? {
return Ok(false);
}
let msg = self.get(context).await?;
if msg.is_setupmessage() {
// do not move setup messages;
// there may be a non-delta device that wants to handle it
return Ok(false);
}
match msg.is_dc_message {
MessengerMessage::No => Ok(false),
MessengerMessage::Yes | MessengerMessage::Reply => Ok(true),
}
}
}
/// An object representing a single message in memory. /// An object representing a single message in memory.
/// The message object is not updated. /// The message object is not updated.
/// If you want an update, you have to recreate the object. /// If you want an update, you have to recreate the object.
@@ -426,6 +459,13 @@ impl Message {
Ok(msg) Ok(msg)
} }
pub fn into_lazy(self) -> LazyMsg {
LazyMsg {
id: self.id,
message: self.into(),
}
}
pub fn get_filemime(&self) -> Option<String> { pub fn get_filemime(&self) -> Option<String> {
if let Some(m) = self.param.get(Param::MimeType) { if let Some(m) = self.param.get(Param::MimeType) {
return Some(m.to_string()); return Some(m.to_string());
@@ -1990,11 +2030,12 @@ mod tests {
let exists = rfc724_mid_exists(&t, "abc@example.com").await.unwrap(); let exists = rfc724_mid_exists(&t, "abc@example.com").await.unwrap();
let (folder_1, _, msg_id) = exists.unwrap(); let (folder_1, _, msg_id) = exists.unwrap();
assert_eq!(folder, folder_1); assert_eq!(folder, folder_1);
let actual = if let Some(config) = msg_id.needs_move(&t.ctx, folder).await.unwrap() { let actual =
t.ctx.get_config(config).await.unwrap() if let Some(config) = msg_id.to_lazy().needs_move(&t.ctx, folder).await.unwrap() {
} else { t.ctx.get_config(config).await.unwrap()
None } else {
}; None
};
let expected = if expected_destination == folder { let expected = if expected_destination == folder {
None None
} else { } else {