mirror of
https://github.com/chatmail/core.git
synced 2026-04-19 14:36:29 +03:00
Re-enable Export to the new backup format, add backup progress, add a test for the backup progress (#2023)
* Add progress for backup import/export * Export to the new backup format * Add tests for the new imex progress
This commit is contained in:
@@ -20,6 +20,16 @@ class ImexTracker:
|
|||||||
elif ffi_event.name == "DC_EVENT_IMEX_FILE_WRITTEN":
|
elif ffi_event.name == "DC_EVENT_IMEX_FILE_WRITTEN":
|
||||||
self._imex_events.put(ffi_event.data2)
|
self._imex_events.put(ffi_event.data2)
|
||||||
|
|
||||||
|
def wait_progress(self, target_progress, progress_upper_limit=1000, progress_timeout=60):
|
||||||
|
while True:
|
||||||
|
ev = self._imex_events.get(timeout=progress_timeout)
|
||||||
|
if isinstance(ev, int) and ev >= target_progress:
|
||||||
|
assert ev <= progress_upper_limit, \
|
||||||
|
str(ev) + " exceeded upper progress limit " + str(progress_upper_limit)
|
||||||
|
return ev
|
||||||
|
if ev == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
def wait_finish(self, progress_timeout=60):
|
def wait_finish(self, progress_timeout=60):
|
||||||
""" Return list of written files, raise ValueError if ExportFailed. """
|
""" Return list of written files, raise ValueError if ExportFailed. """
|
||||||
files_written = []
|
files_written = []
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import queue
|
|||||||
import time
|
import time
|
||||||
from deltachat import const, Account
|
from deltachat import const, Account
|
||||||
from deltachat.message import Message
|
from deltachat.message import Message
|
||||||
|
from deltachat.tracker import ImexTracker
|
||||||
from deltachat.hookspec import account_hookimpl
|
from deltachat.hookspec import account_hookimpl
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
@@ -1337,18 +1338,31 @@ class TestOnlineAccount:
|
|||||||
m = message_queue.get()
|
m = message_queue.get()
|
||||||
assert m == msg_in
|
assert m == msg_in
|
||||||
|
|
||||||
def test_import_export_online_all(self, acfactory, tmpdir, lp):
|
def test_import_export_online_all(self, acfactory, tmpdir, data, lp):
|
||||||
ac1 = acfactory.get_one_online_account()
|
ac1 = acfactory.get_one_online_account()
|
||||||
|
|
||||||
lp.sec("create some chat content")
|
lp.sec("create some chat content")
|
||||||
contact1 = ac1.create_contact("some1@example.org", name="some1")
|
chat1 = ac1.create_contact("some1@example.org", name="some1").create_chat()
|
||||||
contact1.create_chat().send_text("msg1")
|
chat1.send_text("msg1")
|
||||||
assert len(ac1.get_contacts(query="some1")) == 1
|
assert len(ac1.get_contacts(query="some1")) == 1
|
||||||
|
|
||||||
|
original_image_path = data.get_path("d.png")
|
||||||
|
chat1.send_image(original_image_path)
|
||||||
|
|
||||||
backupdir = tmpdir.mkdir("backup")
|
backupdir = tmpdir.mkdir("backup")
|
||||||
|
|
||||||
lp.sec("export all to {}".format(backupdir))
|
lp.sec("export all to {}".format(backupdir))
|
||||||
path = ac1.export_all(backupdir.strpath)
|
with ac1.temp_plugin(ImexTracker()) as imex_tracker:
|
||||||
assert os.path.exists(path)
|
path = ac1.export_all(backupdir.strpath)
|
||||||
|
assert os.path.exists(path)
|
||||||
|
|
||||||
|
# check progress events for export
|
||||||
|
assert imex_tracker.wait_progress(1, progress_upper_limit=249)
|
||||||
|
assert imex_tracker.wait_progress(250, progress_upper_limit=499)
|
||||||
|
assert imex_tracker.wait_progress(500, progress_upper_limit=749)
|
||||||
|
assert imex_tracker.wait_progress(750, progress_upper_limit=999)
|
||||||
|
assert imex_tracker.wait_progress(1000)
|
||||||
|
|
||||||
t = time.time()
|
t = time.time()
|
||||||
|
|
||||||
lp.sec("get fresh empty account")
|
lp.sec("get fresh empty account")
|
||||||
@@ -1359,15 +1373,25 @@ class TestOnlineAccount:
|
|||||||
assert path2 == path
|
assert path2 == path
|
||||||
|
|
||||||
lp.sec("import backup and check it's proper")
|
lp.sec("import backup and check it's proper")
|
||||||
ac2.import_all(path)
|
with ac2.temp_plugin(ImexTracker()) as imex_tracker:
|
||||||
|
ac2.import_all(path)
|
||||||
|
|
||||||
|
# check progress events for import
|
||||||
|
assert imex_tracker.wait_progress(1, progress_upper_limit=249)
|
||||||
|
assert imex_tracker.wait_progress(500, progress_upper_limit=749)
|
||||||
|
assert imex_tracker.wait_progress(750, progress_upper_limit=999)
|
||||||
|
assert imex_tracker.wait_progress(1000)
|
||||||
|
|
||||||
contacts = ac2.get_contacts(query="some1")
|
contacts = ac2.get_contacts(query="some1")
|
||||||
assert len(contacts) == 1
|
assert len(contacts) == 1
|
||||||
contact2 = contacts[0]
|
contact2 = contacts[0]
|
||||||
assert contact2.addr == "some1@example.org"
|
assert contact2.addr == "some1@example.org"
|
||||||
chat2 = contact2.create_chat()
|
chat2 = contact2.create_chat()
|
||||||
messages = chat2.get_messages()
|
messages = chat2.get_messages()
|
||||||
assert len(messages) == 1
|
assert len(messages) == 2
|
||||||
assert messages[0].text == "msg1"
|
assert messages[0].text == "msg1"
|
||||||
|
assert messages[1].filemime == "image/png"
|
||||||
|
assert os.stat(messages[1].filename).st_size == os.stat(original_image_path).st_size
|
||||||
|
|
||||||
# wait until a second passed since last backup
|
# wait until a second passed since last backup
|
||||||
# because get_latest_backupfile() shall return the latest backup
|
# because get_latest_backupfile() shall return the latest backup
|
||||||
|
|||||||
@@ -537,29 +537,9 @@ pub fn dc_open_file_std<P: AsRef<std::path::Path>>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn get_next_backup_path_old(
|
|
||||||
folder: impl AsRef<Path>,
|
|
||||||
backup_time: i64,
|
|
||||||
) -> Result<PathBuf, Error> {
|
|
||||||
let folder = PathBuf::from(folder.as_ref());
|
|
||||||
let stem = chrono::NaiveDateTime::from_timestamp(backup_time, 0)
|
|
||||||
.format("delta-chat-%Y-%m-%d")
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// 64 backup files per day should be enough for everyone
|
|
||||||
for i in 0..64 {
|
|
||||||
let mut path = folder.clone();
|
|
||||||
path.push(format!("{}-{}.bak", stem, i));
|
|
||||||
if !path.exists().await {
|
|
||||||
return Ok(path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bail!("could not create backup file, disk full?");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns Ok((temp_path, dest_path)) on success. The backup can then be written to temp_path. If the backup succeeded,
|
/// Returns Ok((temp_path, dest_path)) on success. The backup can then be written to temp_path. If the backup succeeded,
|
||||||
/// it can be renamed to dest_path. This guarantees that the backup is complete.
|
/// it can be renamed to dest_path. This guarantees that the backup is complete.
|
||||||
pub(crate) async fn get_next_backup_path_new(
|
pub(crate) async fn get_next_backup_path(
|
||||||
folder: impl AsRef<Path>,
|
folder: impl AsRef<Path>,
|
||||||
backup_time: i64,
|
backup_time: i64,
|
||||||
) -> Result<(PathBuf, PathBuf), Error> {
|
) -> Result<(PathBuf, PathBuf), Error> {
|
||||||
|
|||||||
166
src/imex.rs
166
src/imex.rs
@@ -1,10 +1,7 @@
|
|||||||
//! # Import/export module
|
//! # Import/export module
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::{
|
use std::ffi::OsStr;
|
||||||
cmp::{max, min},
|
|
||||||
ffi::OsStr,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Context as _;
|
use anyhow::Context as _;
|
||||||
use async_std::path::{Path, PathBuf};
|
use async_std::path::{Path, PathBuf};
|
||||||
@@ -456,9 +453,7 @@ async fn imex_inner(context: &Context, what: ImexMode, path: impl AsRef<Path>) -
|
|||||||
ImexMode::ExportSelfKeys => export_self_keys(context, path).await,
|
ImexMode::ExportSelfKeys => export_self_keys(context, path).await,
|
||||||
ImexMode::ImportSelfKeys => import_self_keys(context, path).await,
|
ImexMode::ImportSelfKeys => import_self_keys(context, path).await,
|
||||||
|
|
||||||
// TODO In some months we can change the export_backup_old() call to export_backup() and delete export_backup_old().
|
ImexMode::ExportBackup => export_backup(context, path).await,
|
||||||
// (now is 07/2020)
|
|
||||||
ImexMode::ExportBackup => export_backup_old(context, path).await,
|
|
||||||
// import_backup() will call import_backup_old() if this is an old backup.
|
// import_backup() will call import_backup_old() if this is an old backup.
|
||||||
ImexMode::ImportBackup => import_backup(context, path).await,
|
ImexMode::ImportBackup => import_backup(context, path).await,
|
||||||
}
|
}
|
||||||
@@ -494,10 +489,20 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef<Path>) ->
|
|||||||
);
|
);
|
||||||
|
|
||||||
let backup_file = File::open(backup_to_import).await?;
|
let backup_file = File::open(backup_to_import).await?;
|
||||||
|
let file_size = backup_file.metadata().await?.len();
|
||||||
let archive = Archive::new(backup_file);
|
let archive = Archive::new(backup_file);
|
||||||
|
|
||||||
let mut entries = archive.entries()?;
|
let mut entries = archive.entries()?;
|
||||||
while let Some(file) = entries.next().await {
|
while let Some(file) = entries.next().await {
|
||||||
let f = &mut file?;
|
let f = &mut file?;
|
||||||
|
|
||||||
|
let current_pos = f.raw_file_position();
|
||||||
|
let progress = 1000 * current_pos / file_size;
|
||||||
|
if progress > 10 && progress < 1000 {
|
||||||
|
// We already emitted ImexProgress(10) above
|
||||||
|
context.emit_event(EventType::ImexProgress(progress as usize));
|
||||||
|
}
|
||||||
|
|
||||||
if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
|
if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
|
||||||
// async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file.
|
// async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file.
|
||||||
f.unpack_in(context.get_blobdir()).await?;
|
f.unpack_in(context.get_blobdir()).await?;
|
||||||
@@ -506,7 +511,6 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef<Path>) ->
|
|||||||
context.get_dbfile(),
|
context.get_dbfile(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
context.emit_event(EventType::ImexProgress(400)); // Just guess the progress, we at least have the dbfile by now
|
|
||||||
} else {
|
} else {
|
||||||
// async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards.
|
// async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards.
|
||||||
f.unpack_in(context.get_blobdir()).await?;
|
f.unpack_in(context.get_blobdir()).await?;
|
||||||
@@ -642,7 +646,7 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef<Path>
|
|||||||
async fn export_backup(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
|
async fn export_backup(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
|
||||||
// get a fine backup file name (the name includes the date so that multiple backup instances are possible)
|
// get a fine backup file name (the name includes the date so that multiple backup instances are possible)
|
||||||
let now = time();
|
let now = time();
|
||||||
let (temp_path, dest_path) = get_next_backup_path_new(dir, now).await?;
|
let (temp_path, dest_path) = get_next_backup_path(dir, now).await?;
|
||||||
let _d = DeleteOnDrop(temp_path.clone());
|
let _d = DeleteOnDrop(temp_path.clone());
|
||||||
|
|
||||||
context
|
context
|
||||||
@@ -706,131 +710,37 @@ async fn export_backup_inner(context: &Context, temp_path: &PathBuf) -> Result<(
|
|||||||
.append_path_with_name(context.get_dbfile(), DBFILE_BACKUP_NAME)
|
.append_path_with_name(context.get_dbfile(), DBFILE_BACKUP_NAME)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
context.emit_event(EventType::ImexProgress(500));
|
let read_dir: Vec<_> = fs::read_dir(context.get_blobdir()).await?.collect().await;
|
||||||
|
let count = read_dir.len();
|
||||||
|
let mut written_files = 0;
|
||||||
|
|
||||||
builder
|
for entry in read_dir.into_iter() {
|
||||||
.append_dir_all(BLOBS_BACKUP_NAME, context.get_blobdir())
|
let entry = entry?;
|
||||||
.await?;
|
let name = entry.file_name();
|
||||||
|
if !entry.file_type().await?.is_file() {
|
||||||
|
warn!(
|
||||||
|
context,
|
||||||
|
"Export: Found dir entry {} that is not a file, ignoring",
|
||||||
|
name.to_string_lossy()
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mut file = File::open(entry.path()).await?;
|
||||||
|
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(name);
|
||||||
|
builder.append_file(path_in_archive, &mut file).await?;
|
||||||
|
|
||||||
|
written_files += 1;
|
||||||
|
let progress = 1000 * written_files / count;
|
||||||
|
if progress > 10 && progress < 1000 {
|
||||||
|
// We already emitted ImexProgress(10) above
|
||||||
|
emit_event!(context, EventType::ImexProgress(progress));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
builder.finish().await?;
|
builder.finish().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn export_backup_old(context: &Context, dir: impl AsRef<Path>) -> Result<()> {
|
|
||||||
// get a fine backup file name (the name includes the date so that multiple backup instances are possible)
|
|
||||||
// FIXME: we should write to a temporary file first and rename it on success. this would guarantee the backup is complete.
|
|
||||||
// let dest_path_filename = dc_get_next_backup_file(context, dir, res);
|
|
||||||
let now = time();
|
|
||||||
let dest_path_filename = get_next_backup_path_old(dir, now).await?;
|
|
||||||
let dest_path_string = dest_path_filename.to_string_lossy().to_string();
|
|
||||||
|
|
||||||
sql::housekeeping(context).await;
|
|
||||||
|
|
||||||
context.sql.execute("VACUUM;", paramsv![]).await.ok();
|
|
||||||
|
|
||||||
// we close the database during the copy of the dbfile
|
|
||||||
context.sql.close().await;
|
|
||||||
info!(
|
|
||||||
context,
|
|
||||||
"Backup '{}' to '{}'.",
|
|
||||||
context.get_dbfile().display(),
|
|
||||||
dest_path_filename.display(),
|
|
||||||
);
|
|
||||||
let copied = dc_copy_file(context, context.get_dbfile(), &dest_path_filename).await;
|
|
||||||
context
|
|
||||||
.sql
|
|
||||||
.open(&context, &context.get_dbfile(), false)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if !copied {
|
|
||||||
bail!(
|
|
||||||
"could not copy file from '{}' to '{}'",
|
|
||||||
context.get_dbfile().display(),
|
|
||||||
dest_path_string
|
|
||||||
);
|
|
||||||
}
|
|
||||||
let dest_sql = Sql::new();
|
|
||||||
dest_sql
|
|
||||||
.open(context, &dest_path_filename, false)
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("could not open exported database {}", dest_path_string))?;
|
|
||||||
|
|
||||||
let res = match add_files_to_export(context, &dest_sql).await {
|
|
||||||
Err(err) => {
|
|
||||||
dc_delete_file(context, &dest_path_filename).await;
|
|
||||||
error!(context, "backup failed: {}", err);
|
|
||||||
Err(err)
|
|
||||||
}
|
|
||||||
Ok(()) => {
|
|
||||||
dest_sql
|
|
||||||
.set_raw_config_int(context, "backup_time", now as i32)
|
|
||||||
.await?;
|
|
||||||
context.emit_event(EventType::ImexFileWritten(dest_path_filename));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
dest_sql.close().await;
|
|
||||||
|
|
||||||
Ok(res?)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn add_files_to_export(context: &Context, sql: &Sql) -> Result<()> {
|
|
||||||
// add all files as blobs to the database copy (this does not require
|
|
||||||
// the source to be locked, neigher the destination as it is used only here)
|
|
||||||
if !sql.table_exists("backup_blobs").await? {
|
|
||||||
sql.execute(
|
|
||||||
"CREATE TABLE backup_blobs (id INTEGER PRIMARY KEY, file_name, file_content);",
|
|
||||||
paramsv![],
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
// copy all files from BLOBDIR into backup-db
|
|
||||||
let mut total_files_cnt = 0;
|
|
||||||
let dir = context.get_blobdir();
|
|
||||||
let dir_handle = async_std::fs::read_dir(&dir).await?;
|
|
||||||
total_files_cnt += dir_handle.filter(|r| r.is_ok()).count().await;
|
|
||||||
|
|
||||||
info!(context, "EXPORT: total_files_cnt={}", total_files_cnt);
|
|
||||||
|
|
||||||
sql.with_conn_async(|conn| async move {
|
|
||||||
// scan directory, pass 2: copy files
|
|
||||||
let mut dir_handle = async_std::fs::read_dir(&dir).await?;
|
|
||||||
|
|
||||||
let mut processed_files_cnt = 0;
|
|
||||||
while let Some(entry) = dir_handle.next().await {
|
|
||||||
let entry = entry?;
|
|
||||||
if context.shall_stop_ongoing().await {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
processed_files_cnt += 1;
|
|
||||||
let permille = max(min(processed_files_cnt * 1000 / total_files_cnt, 990), 10);
|
|
||||||
context.emit_event(EventType::ImexProgress(permille));
|
|
||||||
|
|
||||||
let name_f = entry.file_name();
|
|
||||||
let name = name_f.to_string_lossy();
|
|
||||||
if name.starts_with("delta-chat") && name.ends_with(".bak") {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
info!(context, "EXPORT: copying filename={}", name);
|
|
||||||
let curr_path_filename = context.get_blobdir().join(entry.file_name());
|
|
||||||
if let Ok(buf) = dc_read_file(context, &curr_path_filename).await {
|
|
||||||
if buf.is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// bail out if we can't insert
|
|
||||||
let mut stmt = conn.prepare_cached(
|
|
||||||
"INSERT INTO backup_blobs (file_name, file_content) VALUES (?, ?);",
|
|
||||||
)?;
|
|
||||||
stmt.execute(paramsv![name, buf])?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
* Classic key import
|
* Classic key import
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|||||||
Reference in New Issue
Block a user