diff --git a/src/constants.rs b/src/constants.rs index ae7cb7fa7..ed4d5d732 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -257,6 +257,9 @@ pub(crate) const ASM_BODY: &str = "This is the Autocrypt Setup Message \ If you see this message in a chatmail client (Delta Chat, Arcane Chat, Delta Touch ...), \ use \"Settings / Add Second Device\" instead."; +/// Period between `sql::housekeeping()` runs. +pub(crate) const HOUSEKEEPING_PERIOD: i64 = 24 * 60 * 60; + #[cfg(test)] mod tests { use num_traits::FromPrimitive; diff --git a/src/context.rs b/src/context.rs index d5cfe1458..1f70efd57 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1041,6 +1041,13 @@ impl Context { .await? .to_string(), ); + res.insert( + "first_key_contacts_msg_id", + self.sql + .get_raw_config("first_key_contacts_msg_id") + .await? + .unwrap_or_default(), + ); let elapsed = time_elapsed(&self.creation_time); res.insert("uptime", duration_to_str(elapsed)); diff --git a/src/scheduler.rs b/src/scheduler.rs index d4fc5c3b5..491840daf 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -15,6 +15,7 @@ use tokio_util::task::TaskTracker; use self::connectivity::ConnectivityStore; use crate::config::{self, Config}; +use crate::constants; use crate::contact::{ContactId, RecentlySeenLoop}; use crate::context::Context; use crate::download::{DownloadState, download_msg}; @@ -497,7 +498,8 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session) match ctx.get_config_i64(Config::LastHousekeeping).await { Ok(last_housekeeping_time) => { - let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24); + let next_housekeeping_time = + last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD); if next_housekeeping_time <= time() { sql::housekeeping(ctx).await.log_err(ctx).ok(); } diff --git a/src/sql.rs b/src/sql.rs index db0a367db..57fcefa55 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -581,6 +581,12 @@ impl Sql { Ok(value) } + /// Removes the `key`'s value from the cache. + pub(crate) async fn uncache_raw_config(&self, key: &str) { + let mut lock = self.config_cache.write().await; + lock.remove(key); + } + /// Sets configuration for the given key to 32-bit signed integer value. pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> { self.set_raw_config(key, Some(&format!("{value}"))).await @@ -724,6 +730,11 @@ pub async fn housekeeping(context: &Context) -> Result<()> { .context("Failed to cleanup HTTP cache") .log_err(context) .ok(); + migrations::msgs_to_key_contacts(context) + .await + .context("migrations::msgs_to_key_contacts") + .log_err(context) + .ok(); if let Err(err) = remove_unused_files(context).await { warn!( diff --git a/src/sql/migrations.rs b/src/sql/migrations.rs index 6f8f27d27..fc16f6541 100644 --- a/src/sql/migrations.rs +++ b/src/sql/migrations.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::{Context as _, Result, ensure}; use deltachat_contact_tools::EmailAddress; @@ -21,7 +21,7 @@ use crate::login_param::ConfiguredLoginParam; use crate::message::MsgId; use crate::provider::get_provider_by_domain; use crate::sql::Sql; -use crate::tools::inc_and_check; +use crate::tools::{Time, inc_and_check, time_elapsed}; const DBVERSION: i32 = 68; const VERSION_CFG: &str = "dbversion"; @@ -1245,6 +1245,10 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint); "key-contacts migration took {:?} in total.", start.elapsed() ); + // Schedule `msgs_to_key_contacts()`. + context + .set_config_internal(Config::LastHousekeeping, None) + .await?; } let new_version = sql @@ -1830,65 +1834,113 @@ fn migrate_key_contacts( } // ======================= Step 5: ======================= - // Rewrite `from_id` in messages + // Prepare for rewriting `from_id`, `to_id` in messages { - let start = Instant::now(); - - let mut encrypted_msgs_stmt = transaction - .prepare( - "SELECT id, from_id, to_id - FROM msgs - WHERE chat_id>9 - AND (param GLOB '*\nc=1*' OR param GLOB 'c=1*') - ORDER BY id DESC LIMIT 10000", + let mut contacts_map = autocrypt_key_contacts_with_reset_peerstate; + for (old, new) in autocrypt_key_contacts { + contacts_map.insert(old, new); + } + transaction + .execute( + "CREATE TABLE key_contacts_map ( + old_id INTEGER PRIMARY KEY NOT NULL, + new_id INTEGER NOT NULL + ) STRICT", + (), ) .context("Step 32")?; - let mut rewrite_msg_stmt = transaction - .prepare("UPDATE msgs SET from_id=?, to_id=? WHERE id=?") - .context("Step 32.1")?; - - struct LoadedMsg { - id: u32, - from_id: u32, - to_id: u32, + { + let mut stmt = transaction + .prepare("INSERT INTO key_contacts_map (old_id, new_id) VALUES (?, ?)") + .context("Step 33")?; + for ids in contacts_map { + stmt.execute(ids).context("Step 34")?; + } } - - let encrypted_msgs = encrypted_msgs_stmt - .query_map((), |row| { - let id: u32 = row.get(0)?; - let from_id: u32 = row.get(1)?; - let to_id: u32 = row.get(2)?; - Ok(LoadedMsg { id, from_id, to_id }) - }) - .context("Step 33")?; - - for msg in encrypted_msgs { - let msg = msg.context("Step 34")?; - - let new_from_id = *autocrypt_key_contacts - .get(&msg.from_id) - .or_else(|| autocrypt_key_contacts_with_reset_peerstate.get(&msg.from_id)) - .unwrap_or(&msg.from_id); - - let new_to_id = *autocrypt_key_contacts - .get(&msg.to_id) - .or_else(|| autocrypt_key_contacts_with_reset_peerstate.get(&msg.to_id)) - .unwrap_or(&msg.to_id); - - rewrite_msg_stmt - .execute((new_from_id, new_to_id, msg.id)) - .context("Step 35")?; - } - info!( - context, - "Rewriting msgs to key-contacts took {:?}.", - start.elapsed() - ); + transaction + .execute( + "INSERT INTO config (keyname, value) VALUES ( + 'first_key_contacts_msg_id', + IFNULL((SELECT MAX(id)+1 FROM msgs), 0) + )", + (), + ) + .context("Step 35")?; } Ok(()) } +/// Rewrite `from_id`, `to_id` in >= 1000 messages starting from the newest ones, to key-contacts. +pub(crate) async fn msgs_to_key_contacts(context: &Context) -> Result<()> { + let sql = &context.sql; + if sql + .get_raw_config_int64("first_key_contacts_msg_id") + .await? + <= Some(0) + { + return Ok(()); + } + let trans_fn = |t: &mut rusqlite::Transaction| { + let mut first_key_contacts_msg_id: u64 = t + .query_one( + "SELECT CAST(value AS INTEGER) FROM config WHERE keyname='first_key_contacts_msg_id'", + (), + |row| row.get(0), + ) + .context("Get first_key_contacts_msg_id")?; + let mut stmt = t + .prepare( + "UPDATE msgs SET + from_id=IFNULL( + (SELECT new_id FROM key_contacts_map WHERE old_id=msgs.from_id), + from_id + ), + to_id=IFNULL( + (SELECT new_id FROM key_contacts_map WHERE old_id=msgs.to_id), + to_id + ) + WHERE id>=? AND id9 + AND (param GLOB '*\nc=1*' OR param GLOB 'c=1*')", + ) + .context("Prepare stmt")?; + let msgs_to_migrate = 1000; + let mut msgs_migrated: u64 = 0; + while first_key_contacts_msg_id > 0 && msgs_migrated < msgs_to_migrate { + let start_msg_id = first_key_contacts_msg_id.saturating_sub(msgs_to_migrate); + let cnt: u64 = stmt + .execute((start_msg_id, first_key_contacts_msg_id)) + .context("UPDATE msgs")? + .try_into()?; + msgs_migrated += cnt; + first_key_contacts_msg_id = start_msg_id; + } + t.execute( + "UPDATE config SET value=? WHERE keyname='first_key_contacts_msg_id'", + (first_key_contacts_msg_id,), + ) + .context("Update first_key_contacts_msg_id")?; + Ok((msgs_migrated, first_key_contacts_msg_id)) + }; + let start = Time::now(); + let mut msgs_migrated = 0; + loop { + let (n, first_key_contacts_msg_id) = sql.transaction(trans_fn).await?; + msgs_migrated += n; + if first_key_contacts_msg_id == 0 || time_elapsed(&start) >= Duration::from_millis(500) { + break; + } + } + sql.uncache_raw_config("first_key_contacts_msg_id").await; + info!( + context, + "Rewriting {msgs_migrated} msgs to key-contacts took {:?}.", + time_elapsed(&start), + ); + Ok(()) +} + impl Sql { async fn set_db_version(&self, version: i32) -> Result<()> { self.set_raw_config_int(VERSION_CFG, version).await?;