Fix imex race condition, (#2255)

fix #2254: if the DB was closed without calling stop_io() and then an interrupt arrives (e.g. incoming message), the db was corrupted.

* Add result.log() for logging with less boilerplate code

* Bugfix: Resultify housekeeping() to make it abort if the db is closed instead of just deleting everything

* Require the UI to call dc_stop_io() before backup export

* Prepare a bit better for closed-db: Resultify get_uidvalidity and get_uid_next and let job::load_next() wait until the db is open

About the bug (before this PR):
if the DB was closed without calling stop_io() and then an interrupt arrives (e.g. incoming message):
- I don't know if it downloads the message, but of course at some point the process of receiving the message will fail
- In my test, DC is just in the process of moving a message when the imex starts, but then can't delete the job or update the msg server_uid
- Then, when job::load_next() is called, no job can be loaded. That's why it calls `load_housekeeping_job()`. As `load_housekeeping_job()` can't load the time of the last housekeeping, it assumes we never ran housekeeping and returns a new Housekeeping job, which is immediately executed.
- housekeeping can't find any blobs referenced in the db and therefore deletes almost all blobs.
This commit is contained in:
Hocuri
2021-03-02 10:25:02 +01:00
committed by GitHub
parent a698a8dd84
commit 2a39dc06e9
13 changed files with 231 additions and 70 deletions

View File

@@ -501,7 +501,7 @@ mod tests {
#[async_std::test]
async fn test_stock_ephemeral_messages() {
let context = TestContext::new().await.ctx;
let context = TestContext::new().await;
assert_eq!(
stock_ephemeral_timer_changed(&context, Timer::Disabled, DC_CONTACT_ID_SELF).await,

View File

@@ -564,8 +564,8 @@ impl Imap {
.uid_validity
.with_context(|| format!("No UIDVALIDITY for folder {}", folder))?;
let old_uid_validity = get_uidvalidity(context, folder).await;
let old_uid_next = get_uid_next(context, folder).await;
let old_uid_validity = get_uidvalidity(context, folder).await?;
let old_uid_next = get_uid_next(context, folder).await?;
if new_uid_validity == old_uid_validity {
let new_emails = if newly_selected == NewlySelected::No {
@@ -658,7 +658,7 @@ impl Imap {
return Ok(false);
}
let old_uid_next = get_uid_next(context, folder.as_ref()).await;
let old_uid_next = get_uid_next(context, folder.as_ref()).await?;
let msgs = if fetch_existing_msgs {
self.prefetch_existing_msgs().await?
@@ -1733,16 +1733,15 @@ pub(crate) async fn set_uid_next(context: &Context, folder: &str, uid_next: u32)
/// This method returns the uid_next from the last time we fetched messages.
/// We can compare this to the current uid_next to find out whether there are new messages
/// and fetch from this value on to get all new messages.
async fn get_uid_next(context: &Context, folder: &str) -> u32 {
context
async fn get_uid_next(context: &Context, folder: &str) -> Result<u32> {
Ok(context
.sql
.query_get_value(
context,
.query_get_value_result(
"SELECT uid_next FROM imap_sync WHERE folder=?;",
paramsv![folder],
)
.await
.unwrap_or(0)
.await?
.unwrap_or(0))
}
pub(crate) async fn set_uidvalidity(
@@ -1761,16 +1760,15 @@ pub(crate) async fn set_uidvalidity(
Ok(())
}
async fn get_uidvalidity(context: &Context, folder: &str) -> u32 {
context
async fn get_uidvalidity(context: &Context, folder: &str) -> Result<u32> {
Ok(context
.sql
.query_get_value(
context,
.query_get_value_result(
"SELECT uidvalidity FROM imap_sync WHERE folder=?;",
paramsv![folder],
)
.await
.unwrap_or(0)
.await?
.unwrap_or(0))
}
/// Deprecated, use get_uid_next() and get_uidvalidity()
@@ -1879,17 +1877,17 @@ mod tests {
#[async_std::test]
async fn test_set_uid_next_validity() {
let t = TestContext::new_alice().await;
assert_eq!(get_uid_next(&t.ctx, "Inbox").await, 0);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await, 0);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 0);
set_uidvalidity(&t.ctx, "Inbox", 7).await.unwrap();
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await, 7);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await, 0);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 7);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 0);
set_uid_next(&t.ctx, "Inbox", 5).await.unwrap();
set_uidvalidity(&t.ctx, "Inbox", 6).await.unwrap();
assert_eq!(get_uid_next(&t.ctx, "Inbox").await, 5);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await, 6);
assert_eq!(get_uid_next(&t.ctx, "Inbox").await.unwrap(), 5);
assert_eq!(get_uidvalidity(&t.ctx, "Inbox").await.unwrap(), 6);
}
#[test]

View File

@@ -11,7 +11,6 @@ use async_std::{
};
use rand::{thread_rng, Rng};
use crate::blob::BlobObject;
use crate::chat;
use crate::chat::delete_and_reset_all_device_msgs;
use crate::config::Config;
@@ -30,6 +29,7 @@ use crate::param::Param;
use crate::pgp;
use crate::sql::{self, Sql};
use crate::stock_str;
use crate::{blob::BlobObject, log::LogExt};
use ::pgp::types::KeyTrait;
use async_tar::Archive;
@@ -496,6 +496,10 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef<Path>) ->
!context.is_configured().await,
"Cannot import backups to accounts in use."
);
ensure!(
!context.scheduler.read().await.is_running(),
"cannot import backup, IO already running"
);
context.sql.close().await;
dc_delete_file(context, context.get_dbfile()).await;
ensure!(
@@ -563,6 +567,10 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef<Path>
!context.is_configured().await,
"Cannot import backups to accounts in use."
);
ensure!(
!context.scheduler.read().await.is_running(),
"cannot import backup, IO already running"
);
context.sql.close().await;
dc_delete_file(context, context.get_dbfile()).await;
ensure!(
@@ -668,7 +676,7 @@ async fn export_backup(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
.sql
.set_raw_config_int(context, "backup_time", now as i32)
.await?;
sql::housekeeping(context).await;
sql::housekeeping(context).await.log(context);
context
.sql
@@ -676,6 +684,11 @@ async fn export_backup(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
.await
.map_err(|e| warn!(context, "Vacuum failed, exporting anyway {}", e));
ensure!(
!context.scheduler.read().await.is_running(),
"cannot export backup, IO already running"
);
// we close the database during the export
context.sql.close().await;

View File

@@ -2,16 +2,16 @@
//!
//! This module implements a job queue maintained in the SQLite database
//! and job types.
use std::fmt;
use std::future::Future;
use std::{fmt, time::Duration};
use anyhow::{bail, ensure, format_err, Context as _, Error, Result};
use async_smtp::smtp::response::{Category, Code, Detail};
use async_std::task::sleep;
use deltachat_derive::{FromSql, ToSql};
use itertools::Itertools;
use rand::{thread_rng, Rng};
use crate::context::Context;
use crate::dc_tools::{dc_delete_file, dc_read_file, time};
use crate::ephemeral::load_imap_deletion_msgid;
use crate::events::EventType;
@@ -29,6 +29,7 @@ use crate::{
};
use crate::{config::Config, constants::Blocked};
use crate::{constants::Chattype, contact::Contact};
use crate::{context::Context, log::LogExt};
use crate::{scheduler::InterruptInfo, sql};
// results in ~3 weeks for the last backoff timespan
@@ -1156,7 +1157,7 @@ async fn perform_job_action(
Action::MoveMsg => job.move_msg(context, connection.inbox()).await,
Action::FetchExistingMsgs => job.fetch_existing_msgs(context, connection.inbox()).await,
Action::Housekeeping => {
sql::housekeeping(context).await;
sql::housekeeping(context).await.log(context);
Status::Finished(Ok(()))
}
};
@@ -1265,6 +1266,17 @@ pub(crate) async fn load_next(
) -> Option<Job> {
info!(context, "loading job for {}-thread", thread);
while !context.sql.is_open().await {
// The db is closed, which means that this thread should not be running.
// Wait until the db is re-opened (if we returned None, this thread might do further damage)
warn!(
context,
"{}: load_next() was called but the db was not opened, THIS SHOULD NOT HAPPEN. Waiting...",
thread
);
sleep(Duration::from_millis(500)).await;
}
let query;
let params;
let t = time();

View File

@@ -1,4 +1,6 @@
//! # Logging macros
//! # Logging
use crate::context::Context;
#[macro_export]
macro_rules! info {
@@ -58,3 +60,39 @@ macro_rules! emit_event {
$ctx.emit_event($event);
};
}
pub trait LogExt<T> {
/// Emits a warning if the receiver contains an Err value.
///
/// Returns an [`Option<T>`] with the `Ok(_)` value, if any:
/// - You won't get any warnings about unused results but can still use the value if you need it
/// - This prevents the same warning from being printed to the log multiple times
///
/// Thanks to the [track_caller](https://blog.rust-lang.org/2020/08/27/Rust-1.46.0.html#track_caller)
/// feature, the location of the caller is printed to the log, just like with the warn!() macro.
#[track_caller]
fn log(self, context: &Context) -> Option<T>;
}
impl<T> LogExt<T> for anyhow::Result<T> {
#[track_caller]
fn log(self, context: &Context) -> Option<T> {
match self {
Err(e) => {
let location = std::panic::Location::caller();
// We are using Anyhow's .context() and to show the inner error, too, we need the {:#}:
let full = format!(
"{file}:{line}: {e:#}",
file = location.file(),
line = location.line(),
e = e
);
// We can't use the warn!() macro here as the file!() and line!() macros
// don't work well with #[track_caller]
emit_event!(context, crate::EventType::Warning(full));
None
}
Ok(v) => Some(v),
}
}
}

View File

@@ -8,6 +8,7 @@ use std::path::Path;
use std::time::Duration;
use anyhow::format_err;
use anyhow::Context as _;
use rusqlite::{Connection, Error as SqlError, OpenFlags};
use crate::chat::{add_device_msg, update_device_icon, update_saved_messages_icon};
@@ -495,7 +496,7 @@ pub fn get_rowid2(
)
}
pub async fn housekeeping(context: &Context) {
pub async fn housekeeping(context: &Context) -> anyhow::Result<()> {
if let Err(err) = crate::ephemeral::delete_expired_messages(context).await {
warn!(context, "Failed to delete expired messages: {}", err);
}
@@ -510,28 +511,28 @@ pub async fn housekeeping(context: &Context) {
"SELECT param FROM msgs WHERE chat_id!=3 AND type!=10;",
Param::File,
)
.await;
.await?;
maybe_add_from_param(
context,
&mut files_in_use,
"SELECT param FROM jobs;",
Param::File,
)
.await;
.await?;
maybe_add_from_param(
context,
&mut files_in_use,
"SELECT param FROM chats;",
Param::ProfileImage,
)
.await;
.await?;
maybe_add_from_param(
context,
&mut files_in_use,
"SELECT param FROM contacts;",
Param::ProfileImage,
)
.await;
.await?;
context
.sql
@@ -547,9 +548,7 @@ pub async fn housekeeping(context: &Context) {
},
)
.await
.unwrap_or_else(|err| {
warn!(context, "sql: failed query: {}", err);
});
.context("housekeeping: failed to SELECT value FROM config")?;
info!(context, "{} files in use.", files_in_use.len(),);
/* go through directory and delete unused files */
@@ -637,6 +636,7 @@ pub async fn housekeeping(context: &Context) {
warn!(context, "Can't set config: {}", e);
}
info!(context, "Housekeeping done.");
Ok(())
}
#[allow(clippy::indexing_slicing)]
@@ -665,7 +665,7 @@ async fn maybe_add_from_param(
files_in_use: &mut HashSet<String>,
query: &str,
param_id: Param,
) {
) -> anyhow::Result<()> {
context
.sql
.query_map(
@@ -683,9 +683,7 @@ async fn maybe_add_from_param(
},
)
.await
.unwrap_or_else(|err| {
warn!(context, "sql: failed to add_from_param: {}", err);
});
.context(format!("housekeeping: failed to add_from_param {}", query))
}
#[allow(clippy::cognitive_complexity)]
@@ -1570,8 +1568,10 @@ async fn prune_tombstones(context: &Context) -> Result<()> {
#[cfg(test)]
mod test {
use async_std::fs::File;
use super::*;
use crate::test_utils::TestContext;
use crate::{test_utils::TestContext, Event, EventType};
#[test]
fn test_maybe_add_file() {
@@ -1613,4 +1613,44 @@ mod test {
assert!(!t.ctx.sql.col_exists("msgs", "foobar").await.unwrap());
assert!(!t.ctx.sql.col_exists("foobar", "foobar").await.unwrap());
}
#[async_std::test]
async fn test_housekeeping_db_closed() {
let t = TestContext::new().await;
let avatar_src = t.dir.path().join("avatar.png");
let avatar_bytes = include_bytes!("../test-data/image/avatar64x64.png");
File::create(&avatar_src)
.await
.unwrap()
.write_all(avatar_bytes)
.await
.unwrap();
t.set_config(Config::Selfavatar, Some(avatar_src.to_str().unwrap()))
.await
.unwrap();
t.add_event_sink(move |event: Event| async move {
match event.typ {
EventType::Info(s) => assert!(
!s.contains("Keeping new unreferenced file"),
"File {} was almost deleted, only reason it was kept is that it was created recently (as the tests don't run for a long time)",
s
),
EventType::Error(s) => panic!(s),
_ => {}
}
})
.await;
let a = t.get_config(Config::Selfavatar).await.unwrap();
assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]);
t.sql.close().await;
housekeeping(&t).await.unwrap_err(); // housekeeping should fail as the db is closed
t.sql.open(&t, &t.get_dbfile(), false).await.unwrap();
let a = t.get_config(Config::Selfavatar).await.unwrap();
assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]);
}
}

View File

@@ -2,17 +2,17 @@
//!
//! This private module is only compiled for test runs.
use std::collections::BTreeMap;
use std::fmt;
use std::ops::Deref;
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::{collections::BTreeMap, panic};
use ansi_term::Color;
use async_std::future::Future;
use async_std::path::PathBuf;
use async_std::pin::Pin;
use async_std::sync::{Arc, RwLock};
use async_std::{channel, pin::Pin};
use chat::ChatItem;
use once_cell::sync::Lazy;
use tempfile::{tempdir, TempDir};
@@ -50,6 +50,8 @@ pub(crate) struct TestContext {
recv_idx: RwLock<u32>,
/// Functions to call for events received.
event_sinks: Arc<RwLock<Vec<Box<EventSink>>>>,
/// Receives panics from sinks ("sink" means "event handler" here)
poison_receiver: channel::Receiver<String>,
}
impl fmt::Debug for TestContext {
@@ -97,7 +99,15 @@ impl TestContext {
let events = ctx.get_event_emitter();
let event_sinks: Arc<RwLock<Vec<Box<EventSink>>>> = Arc::new(RwLock::new(Vec::new()));
let sinks = Arc::clone(&event_sinks);
let (poison_sender, poison_receiver) = channel::bounded(1);
async_std::task::spawn(async move {
// Make sure that the test fails if there is a panic on this thread here:
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
poison_sender.try_send(panic_info.to_string()).ok();
orig_hook(panic_info);
}));
while let Some(event) = events.recv().await {
{
let sinks = sinks.read().await;
@@ -114,6 +124,7 @@ impl TestContext {
dir,
recv_idx: RwLock::new(0),
event_sinks,
poison_receiver,
}
}
@@ -410,6 +421,14 @@ impl Deref for TestContext {
}
}
impl Drop for TestContext {
fn drop(&mut self) {
if let Ok(p) = self.poison_receiver.try_recv() {
panic!(p);
}
}
}
/// A raw message as it was scheduled to be sent.
///
/// This is a raw message, probably in the shape DC was planning to send it but not having