diff --git a/src/chat.rs b/src/chat.rs index fc490a9b5..c34c7302f 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -42,7 +42,6 @@ use crate::peerstate::Peerstate; use crate::receive_imf::ReceivedMsg; use crate::securejoin::BobState; use crate::smtp::send_msg_to_smtp; -use crate::sql; use crate::stock_str; use crate::sync::{self, Sync::*, SyncData}; use crate::tools::{ @@ -4144,7 +4143,6 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) ensure!(!msg_ids.is_empty(), "empty msgs_ids: nothing to forward"); ensure!(!chat_id.is_special(), "can not forward to special chat"); - let mut created_chats: Vec = Vec::new(); let mut created_msgs: Vec = Vec::new(); let mut curr_timestamp: i64; @@ -4156,20 +4154,17 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) bail!("cannot send to {}: {}", chat_id, reason); } curr_timestamp = create_smeared_timestamps(context, msg_ids.len()); - let ids = context - .sql - .query_map( - &format!( - "SELECT id FROM msgs WHERE id IN({}) ORDER BY timestamp,id", - sql::repeat_vars(msg_ids.len()) - ), - rusqlite::params_from_iter(msg_ids), - |row| row.get::<_, MsgId>(0), - |ids| ids.collect::, _>>().map_err(Into::into), - ) - .await?; - - for id in ids { + let mut msgs = Vec::with_capacity(msg_ids.len()); + for id in msg_ids { + let ts: i64 = context + .sql + .query_get_value("SELECT timestamp FROM msgs WHERE id=?", (id,)) + .await? + .context("No message {id}")?; + msgs.push((ts, *id)); + } + msgs.sort_unstable(); + for (_, id) in msgs { let src_msg_id: MsgId = id; let mut msg = Message::load_from_db(context, src_msg_id).await?; if msg.state == MessageState::OutDraft { @@ -4206,11 +4201,10 @@ pub async fn forward_msgs(context: &Context, msg_ids: &[MsgId], chat_id: ChatId) if !create_send_msg_jobs(context, &mut msg).await?.is_empty() { context.scheduler.interrupt_smtp().await; } - created_chats.push(chat_id); created_msgs.push(new_msg_id); } - for (chat_id, msg_id) in created_chats.iter().zip(created_msgs.iter()) { - context.emit_msgs_changed(*chat_id, *msg_id); + for msg_id in created_msgs { + context.emit_msgs_changed(chat_id, msg_id); } Ok(()) } diff --git a/src/contact.rs b/src/contact.rs index c9270fe46..0a4e8595c 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -1,7 +1,7 @@ //! Contacts module use std::cmp::{min, Reverse}; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashSet}; use std::fmt; use std::path::{Path, PathBuf}; use std::time::UNIX_EPOCH; @@ -34,7 +34,6 @@ use crate::message::MessageState; use crate::mimeparser::AvatarAction; use crate::param::{Param, Params}; use crate::peerstate::Peerstate; -use crate::sql::{self, params_iter}; use crate::sync::{self, Sync::*}; use crate::tools::{duration_to_str, get_abs_path, smeared_time, time, SystemTime}; use crate::{chat, chatlist_events, stock_str}; @@ -1040,7 +1039,11 @@ impl Contact { listflags: u32, query: Option<&str>, ) -> Result> { - let self_addrs = context.get_all_self_addrs().await?; + let self_addrs = context + .get_all_self_addrs() + .await? + .into_iter() + .collect::>(); let mut add_self = false; let mut ret = Vec::new(); let flag_verified_only = (listflags & DC_GCL_VERIFIED_ONLY) != 0; @@ -1055,29 +1058,32 @@ impl Contact { context .sql .query_map( - &format!( - "SELECT c.id FROM contacts c \ + "SELECT c.id, c.addr FROM contacts c LEFT JOIN acpeerstates ps ON c.addr=ps.addr \ - WHERE c.addr NOT IN ({}) - AND c.id>? \ + WHERE c.id>? AND c.origin>=? \ AND c.blocked=0 \ AND (iif(c.name='',c.authname,c.name) LIKE ? OR c.addr LIKE ?) \ AND (1=? OR LENGTH(ps.verified_key_fingerprint)!=0) \ ORDER BY c.last_seen DESC, c.id DESC;", - sql::repeat_vars(self_addrs.len()) - ), - rusqlite::params_from_iter(params_iter(&self_addrs).chain(params_slice![ + ( ContactId::LAST_SPECIAL, minimal_origin, - s3str_like_cmd, - s3str_like_cmd, - if flag_verified_only { 0i32 } else { 1i32 } - ])), - |row| row.get::<_, ContactId>(0), - |ids| { - for id in ids { - ret.push(id?); + &s3str_like_cmd, + &s3str_like_cmd, + if flag_verified_only { 0i32 } else { 1i32 }, + ), + |row| { + let id: ContactId = row.get(0)?; + let addr: String = row.get(1)?; + Ok((id, addr)) + }, + |rows| { + for row in rows { + let (id, addr) = row?; + if !self_addrs.contains(&addr) { + ret.push(id); + } } Ok(()) }, @@ -1110,23 +1116,23 @@ impl Contact { context .sql .query_map( - &format!( - "SELECT id FROM contacts - WHERE addr NOT IN ({}) - AND id>? + "SELECT id, addr FROM contacts + WHERE id>? AND origin>=? AND blocked=0 ORDER BY last_seen DESC, id DESC;", - sql::repeat_vars(self_addrs.len()) - ), - rusqlite::params_from_iter( - params_iter(&self_addrs) - .chain(params_slice![ContactId::LAST_SPECIAL, minimal_origin]), - ), - |row| row.get::<_, ContactId>(0), - |ids| { - for id in ids { - ret.push(id?); + (ContactId::LAST_SPECIAL, minimal_origin), + |row| { + let id: ContactId = row.get(0)?; + let addr: String = row.get(1)?; + Ok((id, addr)) + }, + |rows| { + for row in rows { + let (id, addr) = row?; + if !self_addrs.contains(&addr) { + ret.push(id); + } } Ok(()) }, diff --git a/src/message.rs b/src/message.rs index ed0afffa1..b12e3d484 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1669,12 +1669,12 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> .set_config_internal(Config::LastMsgId, Some(&last_msg_id.to_u32().to_string())) .await?; - let msgs = context - .sql - .query_map( - &format!( + let mut msgs = Vec::with_capacity(msg_ids.len()); + for &id in &msg_ids { + if let Some(msg) = context + .sql + .query_row_optional( "SELECT - m.id AS id, m.chat_id AS chat_id, m.state AS state, m.download_state as download_state, @@ -1685,39 +1685,39 @@ pub async fn markseen_msgs(context: &Context, msg_ids: Vec) -> Result<()> c.archived AS archived, c.blocked AS blocked FROM msgs m LEFT JOIN chats c ON c.id=m.chat_id - WHERE m.id IN ({}) AND m.chat_id>9", - sql::repeat_vars(msg_ids.len()) - ), - rusqlite::params_from_iter(&msg_ids), - |row| { - let id: MsgId = row.get("id")?; - let chat_id: ChatId = row.get("chat_id")?; - let state: MessageState = row.get("state")?; - let download_state: DownloadState = row.get("download_state")?; - let param: Params = row.get::<_, String>("param")?.parse().unwrap_or_default(); - let from_id: ContactId = row.get("from_id")?; - let rfc724_mid: String = row.get("rfc724_mid")?; - let visibility: ChatVisibility = row.get("archived")?; - let blocked: Option = row.get("blocked")?; - let ephemeral_timer: EphemeralTimer = row.get("ephemeral_timer")?; - Ok(( - ( - id, - chat_id, - state, - download_state, - param, - from_id, - rfc724_mid, - visibility, - blocked.unwrap_or_default(), - ), - ephemeral_timer, - )) - }, - |rows| rows.collect::, _>>().map_err(Into::into), - ) - .await?; + WHERE m.id=? AND m.chat_id>9", + (id,), + |row| { + let chat_id: ChatId = row.get("chat_id")?; + let state: MessageState = row.get("state")?; + let download_state: DownloadState = row.get("download_state")?; + let param: Params = row.get::<_, String>("param")?.parse().unwrap_or_default(); + let from_id: ContactId = row.get("from_id")?; + let rfc724_mid: String = row.get("rfc724_mid")?; + let visibility: ChatVisibility = row.get("archived")?; + let blocked: Option = row.get("blocked")?; + let ephemeral_timer: EphemeralTimer = row.get("ephemeral_timer")?; + Ok(( + ( + id, + chat_id, + state, + download_state, + param, + from_id, + rfc724_mid, + visibility, + blocked.unwrap_or_default(), + ), + ephemeral_timer, + )) + }, + ) + .await? + { + msgs.push(msg); + } + } if msgs .iter() diff --git a/src/receive_imf.rs b/src/receive_imf.rs index 16f5dfbea..90b5a53aa 100644 --- a/src/receive_imf.rs +++ b/src/receive_imf.rs @@ -33,9 +33,9 @@ use crate::param::{Param, Params}; use crate::peer_channels::{add_gossip_peer_from_header, insert_topic_stub}; use crate::peerstate::Peerstate; use crate::reaction::{set_msg_reaction, Reaction}; +use crate::rusqlite::OptionalExtension; use crate::securejoin::{self, handle_securejoin_handshake, observe_securejoin_on_other_device}; use crate::simplify; -use crate::sql::{self, params_iter}; use crate::stock_str; use crate::sync::Sync::*; use crate::tools::{self, buf_compress, remove_subject_prefix}; @@ -1884,10 +1884,20 @@ async fn lookup_chat_or_create_adhoc_group( if !contact_ids.contains(&from_id) { contact_ids.push(from_id); } - if let Some((chat_id, blocked)) = context - .sql - .query_row_optional( - &format!( + let trans_fn = |t: &mut rusqlite::Transaction| { + t.pragma_update(None, "query_only", "0")?; + t.execute( + "CREATE TEMP TABLE temp.contacts ( + id INTEGER PRIMARY KEY + ) STRICT", + (), + )?; + let mut stmt = t.prepare("INSERT INTO temp.contacts(id) VALUES (?)")?; + for &id in &contact_ids { + stmt.execute((id,))?; + } + let val = t + .query_row( "SELECT c.id, c.blocked FROM chats c INNER JOIN msgs m ON c.id=m.chat_id WHERE m.hidden=0 AND c.grpid='' AND c.name=? @@ -1896,24 +1906,22 @@ async fn lookup_chat_or_create_adhoc_group( AND add_timestamp >= remove_timestamp)=? AND (SELECT COUNT(*) FROM chats_contacts WHERE chat_id=c.id - AND contact_id NOT IN ({}) + AND contact_id NOT IN (SELECT id FROM temp.contacts) AND add_timestamp >= remove_timestamp)=0 ORDER BY m.timestamp DESC", - sql::repeat_vars(contact_ids.len()), - ), - rusqlite::params_from_iter( - params_iter(&[&grpname]) - .chain(params_iter(&[contact_ids.len()])) - .chain(params_iter(&contact_ids)), - ), - |row| { - let id: ChatId = row.get(0)?; - let blocked: Blocked = row.get(1)?; - Ok((id, blocked)) - }, - ) - .await? - { + (&grpname, contact_ids.len()), + |row| { + let id: ChatId = row.get(0)?; + let blocked: Blocked = row.get(1)?; + Ok((id, blocked)) + }, + ) + .optional()?; + t.execute("DROP TABLE temp.contacts", ())?; + Ok(val) + }; + let query_only = true; + if let Some((chat_id, blocked)) = context.sql.transaction_ex(query_only, trans_fn).await? { info!( context, "Assigning message to ad-hoc group {chat_id} with matching name and members." @@ -2864,38 +2872,27 @@ async fn mark_recipients_as_verified( to_ids: Vec, mimeparser: &MimeMessage, ) -> Result<()> { - if to_ids.is_empty() { - return Ok(()); - } - if mimeparser.get_header(HeaderDef::ChatVerified).is_none() { return Ok(()); } - - let rows = context - .sql - .query_map( - &format!( - "SELECT c.addr, LENGTH(ps.verified_key_fingerprint) FROM contacts c \ - LEFT JOIN acpeerstates ps ON c.addr=ps.addr WHERE c.id IN({}) ", - sql::repeat_vars(to_ids.len()) - ), - rusqlite::params_from_iter(&to_ids), - |row| { - let to_addr: String = row.get(0)?; - let is_verified: i32 = row.get(1).unwrap_or(0); - Ok((to_addr, is_verified != 0)) - }, - |rows| { - rows.collect::, _>>() - .map_err(Into::into) - }, - ) - .await?; - let contact = Contact::get_by_id(context, from_id).await?; - - for (to_addr, is_verified) in rows { + for id in to_ids { + let Some((to_addr, is_verified)) = context + .sql + .query_row_optional( + "SELECT c.addr, LENGTH(ps.verified_key_fingerprint) FROM contacts c + LEFT JOIN acpeerstates ps ON c.addr=ps.addr WHERE c.id=?", + (id,), + |row| { + let to_addr: String = row.get(0)?; + let is_verified: i32 = row.get(1).unwrap_or(0); + Ok((to_addr, is_verified != 0)) + }, + ) + .await? + else { + continue; + }; // mark gossiped keys (if any) as verified if let Some(gossiped_key) = mimeparser.gossiped_keys.get(&to_addr.to_lowercase()) { if let Some(mut peerstate) = Peerstate::from_addr(context, &to_addr).await? { diff --git a/src/sql.rs b/src/sql.rs index 62a0b44b9..40db81cc2 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -44,12 +44,6 @@ macro_rules! params_slice { }; } -pub(crate) fn params_iter( - iter: &[impl crate::sql::ToSql], -) -> impl Iterator { - iter.iter().map(|item| item as &dyn crate::sql::ToSql) -} - mod migrations; mod pool; @@ -441,7 +435,7 @@ impl Sql { .await } - /// Execute the function inside a transaction assuming that it does write queries. + /// Execute the function inside a transaction assuming that it does writes. /// /// If the function returns an error, the transaction will be rolled back. If it does not return an /// error, the transaction will be committed. @@ -450,7 +444,28 @@ impl Sql { H: Send + 'static, G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result, { - self.call_write(move |conn| { + let query_only = false; + self.transaction_ex(query_only, callback).await + } + + /// Execute the function inside a transaction. + /// + /// * `query_only` - Whether the function only executes read statements (queries) and can be run + /// in parallel with other transactions. NB: Creating and modifying temporary tables are also + /// allowed with `query_only`, temporary tables aren't visible in other connections, but you + /// need to pass `PRAGMA query_only=0;` to SQLite before that: + /// `pragma_update(None, "query_only", "0")`. + /// Also temporary tables need to be dropped because the connection is returned to the pool + /// then. + /// + /// If the function returns an error, the transaction will be rolled back. If it does not return + /// an error, the transaction will be committed. + pub async fn transaction_ex(&self, query_only: bool, callback: G) -> Result + where + H: Send + 'static, + G: Send + FnOnce(&mut rusqlite::Transaction<'_>) -> Result, + { + self.call(query_only, move |conn| { let mut transaction = conn.transaction()?; let ret = callback(&mut transaction); @@ -1024,16 +1039,6 @@ async fn prune_tombstones(sql: &Sql) -> Result<()> { Ok(()) } -/// Helper function to return comma-separated sequence of `?` chars. -/// -/// Use this together with [`rusqlite::ParamsFromIter`] to use dynamically generated -/// parameter lists. -pub fn repeat_vars(count: usize) -> String { - let mut s = "?,".repeat(count); - s.pop(); // Remove trailing comma - s -} - #[cfg(test)] mod tests { use super::*;