fix: Save msgs to key-contacts migration state and run migration periodically (#6956)

Save:
- (old contact id) -> (new contact id) mapping.
- The message id starting from which all messages are already migrated.
Run the migration from `housekeeping()` for at least 500 ms and for >= 1000 messages per run.
This commit is contained in:
iequidoo
2025-07-06 16:40:16 -03:00
committed by iequidoo
parent a87ee030fc
commit 389649ea8a
5 changed files with 128 additions and 53 deletions

View File

@@ -257,6 +257,9 @@ pub(crate) const ASM_BODY: &str = "This is the Autocrypt Setup Message \
If you see this message in a chatmail client (Delta Chat, Arcane Chat, Delta Touch ...), \ If you see this message in a chatmail client (Delta Chat, Arcane Chat, Delta Touch ...), \
use \"Settings / Add Second Device\" instead."; use \"Settings / Add Second Device\" instead.";
/// Period between `sql::housekeeping()` runs.
pub(crate) const HOUSEKEEPING_PERIOD: i64 = 24 * 60 * 60;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use num_traits::FromPrimitive; use num_traits::FromPrimitive;

View File

@@ -1041,6 +1041,13 @@ impl Context {
.await? .await?
.to_string(), .to_string(),
); );
res.insert(
"first_key_contacts_msg_id",
self.sql
.get_raw_config("first_key_contacts_msg_id")
.await?
.unwrap_or_default(),
);
let elapsed = time_elapsed(&self.creation_time); let elapsed = time_elapsed(&self.creation_time);
res.insert("uptime", duration_to_str(elapsed)); res.insert("uptime", duration_to_str(elapsed));

View File

@@ -15,6 +15,7 @@ use tokio_util::task::TaskTracker;
use self::connectivity::ConnectivityStore; use self::connectivity::ConnectivityStore;
use crate::config::{self, Config}; use crate::config::{self, Config};
use crate::constants;
use crate::contact::{ContactId, RecentlySeenLoop}; use crate::contact::{ContactId, RecentlySeenLoop};
use crate::context::Context; use crate::context::Context;
use crate::download::{DownloadState, download_msg}; use crate::download::{DownloadState, download_msg};
@@ -497,7 +498,8 @@ async fn inbox_fetch_idle(ctx: &Context, imap: &mut Imap, mut session: Session)
match ctx.get_config_i64(Config::LastHousekeeping).await { match ctx.get_config_i64(Config::LastHousekeeping).await {
Ok(last_housekeeping_time) => { Ok(last_housekeeping_time) => {
let next_housekeeping_time = last_housekeeping_time.saturating_add(60 * 60 * 24); let next_housekeeping_time =
last_housekeeping_time.saturating_add(constants::HOUSEKEEPING_PERIOD);
if next_housekeeping_time <= time() { if next_housekeeping_time <= time() {
sql::housekeeping(ctx).await.log_err(ctx).ok(); sql::housekeeping(ctx).await.log_err(ctx).ok();
} }

View File

@@ -581,6 +581,12 @@ impl Sql {
Ok(value) Ok(value)
} }
/// Removes the `key`'s value from the cache.
pub(crate) async fn uncache_raw_config(&self, key: &str) {
let mut lock = self.config_cache.write().await;
lock.remove(key);
}
/// Sets configuration for the given key to 32-bit signed integer value. /// Sets configuration for the given key to 32-bit signed integer value.
pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> { pub async fn set_raw_config_int(&self, key: &str, value: i32) -> Result<()> {
self.set_raw_config(key, Some(&format!("{value}"))).await self.set_raw_config(key, Some(&format!("{value}"))).await
@@ -724,6 +730,11 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
.context("Failed to cleanup HTTP cache") .context("Failed to cleanup HTTP cache")
.log_err(context) .log_err(context)
.ok(); .ok();
migrations::msgs_to_key_contacts(context)
.await
.context("migrations::msgs_to_key_contacts")
.log_err(context)
.ok();
if let Err(err) = remove_unused_files(context).await { if let Err(err) = remove_unused_files(context).await {
warn!( warn!(

View File

@@ -2,7 +2,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::time::Instant; use std::time::{Duration, Instant};
use anyhow::{Context as _, Result, ensure}; use anyhow::{Context as _, Result, ensure};
use deltachat_contact_tools::EmailAddress; use deltachat_contact_tools::EmailAddress;
@@ -21,7 +21,7 @@ use crate::login_param::ConfiguredLoginParam;
use crate::message::MsgId; use crate::message::MsgId;
use crate::provider::get_provider_by_domain; use crate::provider::get_provider_by_domain;
use crate::sql::Sql; use crate::sql::Sql;
use crate::tools::inc_and_check; use crate::tools::{Time, inc_and_check, time_elapsed};
const DBVERSION: i32 = 68; const DBVERSION: i32 = 68;
const VERSION_CFG: &str = "dbversion"; const VERSION_CFG: &str = "dbversion";
@@ -1245,6 +1245,10 @@ CREATE INDEX gossip_timestamp_index ON gossip_timestamp (chat_id, fingerprint);
"key-contacts migration took {:?} in total.", "key-contacts migration took {:?} in total.",
start.elapsed() start.elapsed()
); );
// Schedule `msgs_to_key_contacts()`.
context
.set_config_internal(Config::LastHousekeeping, None)
.await?;
} }
let new_version = sql let new_version = sql
@@ -1830,65 +1834,113 @@ fn migrate_key_contacts(
} }
// ======================= Step 5: ======================= // ======================= Step 5: =======================
// Rewrite `from_id` in messages // Prepare for rewriting `from_id`, `to_id` in messages
{ {
let start = Instant::now(); let mut contacts_map = autocrypt_key_contacts_with_reset_peerstate;
for (old, new) in autocrypt_key_contacts {
let mut encrypted_msgs_stmt = transaction contacts_map.insert(old, new);
.prepare( }
"SELECT id, from_id, to_id transaction
FROM msgs .execute(
WHERE chat_id>9 "CREATE TABLE key_contacts_map (
AND (param GLOB '*\nc=1*' OR param GLOB 'c=1*') old_id INTEGER PRIMARY KEY NOT NULL,
ORDER BY id DESC LIMIT 10000", new_id INTEGER NOT NULL
) STRICT",
(),
) )
.context("Step 32")?; .context("Step 32")?;
let mut rewrite_msg_stmt = transaction {
.prepare("UPDATE msgs SET from_id=?, to_id=? WHERE id=?") let mut stmt = transaction
.context("Step 32.1")?; .prepare("INSERT INTO key_contacts_map (old_id, new_id) VALUES (?, ?)")
.context("Step 33")?;
struct LoadedMsg { for ids in contacts_map {
id: u32, stmt.execute(ids).context("Step 34")?;
from_id: u32, }
to_id: u32,
} }
transaction
let encrypted_msgs = encrypted_msgs_stmt .execute(
.query_map((), |row| { "INSERT INTO config (keyname, value) VALUES (
let id: u32 = row.get(0)?; 'first_key_contacts_msg_id',
let from_id: u32 = row.get(1)?; IFNULL((SELECT MAX(id)+1 FROM msgs), 0)
let to_id: u32 = row.get(2)?; )",
Ok(LoadedMsg { id, from_id, to_id }) (),
}) )
.context("Step 33")?; .context("Step 35")?;
for msg in encrypted_msgs {
let msg = msg.context("Step 34")?;
let new_from_id = *autocrypt_key_contacts
.get(&msg.from_id)
.or_else(|| autocrypt_key_contacts_with_reset_peerstate.get(&msg.from_id))
.unwrap_or(&msg.from_id);
let new_to_id = *autocrypt_key_contacts
.get(&msg.to_id)
.or_else(|| autocrypt_key_contacts_with_reset_peerstate.get(&msg.to_id))
.unwrap_or(&msg.to_id);
rewrite_msg_stmt
.execute((new_from_id, new_to_id, msg.id))
.context("Step 35")?;
}
info!(
context,
"Rewriting msgs to key-contacts took {:?}.",
start.elapsed()
);
} }
Ok(()) Ok(())
} }
/// Rewrite `from_id`, `to_id` in >= 1000 messages starting from the newest ones, to key-contacts.
pub(crate) async fn msgs_to_key_contacts(context: &Context) -> Result<()> {
let sql = &context.sql;
if sql
.get_raw_config_int64("first_key_contacts_msg_id")
.await?
<= Some(0)
{
return Ok(());
}
let trans_fn = |t: &mut rusqlite::Transaction| {
let mut first_key_contacts_msg_id: u64 = t
.query_one(
"SELECT CAST(value AS INTEGER) FROM config WHERE keyname='first_key_contacts_msg_id'",
(),
|row| row.get(0),
)
.context("Get first_key_contacts_msg_id")?;
let mut stmt = t
.prepare(
"UPDATE msgs SET
from_id=IFNULL(
(SELECT new_id FROM key_contacts_map WHERE old_id=msgs.from_id),
from_id
),
to_id=IFNULL(
(SELECT new_id FROM key_contacts_map WHERE old_id=msgs.to_id),
to_id
)
WHERE id>=? AND id<?
AND chat_id>9
AND (param GLOB '*\nc=1*' OR param GLOB 'c=1*')",
)
.context("Prepare stmt")?;
let msgs_to_migrate = 1000;
let mut msgs_migrated: u64 = 0;
while first_key_contacts_msg_id > 0 && msgs_migrated < msgs_to_migrate {
let start_msg_id = first_key_contacts_msg_id.saturating_sub(msgs_to_migrate);
let cnt: u64 = stmt
.execute((start_msg_id, first_key_contacts_msg_id))
.context("UPDATE msgs")?
.try_into()?;
msgs_migrated += cnt;
first_key_contacts_msg_id = start_msg_id;
}
t.execute(
"UPDATE config SET value=? WHERE keyname='first_key_contacts_msg_id'",
(first_key_contacts_msg_id,),
)
.context("Update first_key_contacts_msg_id")?;
Ok((msgs_migrated, first_key_contacts_msg_id))
};
let start = Time::now();
let mut msgs_migrated = 0;
loop {
let (n, first_key_contacts_msg_id) = sql.transaction(trans_fn).await?;
msgs_migrated += n;
if first_key_contacts_msg_id == 0 || time_elapsed(&start) >= Duration::from_millis(500) {
break;
}
}
sql.uncache_raw_config("first_key_contacts_msg_id").await;
info!(
context,
"Rewriting {msgs_migrated} msgs to key-contacts took {:?}.",
time_elapsed(&start),
);
Ok(())
}
impl Sql { impl Sql {
async fn set_db_version(&self, version: i32) -> Result<()> { async fn set_db_version(&self, version: i32) -> Result<()> {
self.set_raw_config_int(VERSION_CFG, version).await?; self.set_raw_config_int(VERSION_CFG, version).await?;