diff --git a/python/src/deltachat/tracker.py b/python/src/deltachat/tracker.py index 1d35c3345..40834ee2d 100644 --- a/python/src/deltachat/tracker.py +++ b/python/src/deltachat/tracker.py @@ -20,6 +20,16 @@ class ImexTracker: elif ffi_event.name == "DC_EVENT_IMEX_FILE_WRITTEN": self._imex_events.put(ffi_event.data2) + def wait_progress(self, target_progress, progress_upper_limit=1000, progress_timeout=60): + while True: + ev = self._imex_events.get(timeout=progress_timeout) + if isinstance(ev, int) and ev >= target_progress: + assert ev <= progress_upper_limit, \ + str(ev) + " exceeded upper progress limit " + str(progress_upper_limit) + return ev + if ev == 0: + return None + def wait_finish(self, progress_timeout=60): """ Return list of written files, raise ValueError if ExportFailed. """ files_written = [] diff --git a/python/tests/test_account.py b/python/tests/test_account.py index 3d0222585..fb821764b 100644 --- a/python/tests/test_account.py +++ b/python/tests/test_account.py @@ -6,6 +6,7 @@ import queue import time from deltachat import const, Account from deltachat.message import Message +from deltachat.tracker import ImexTracker from deltachat.hookspec import account_hookimpl from datetime import datetime, timedelta @@ -1337,18 +1338,31 @@ class TestOnlineAccount: m = message_queue.get() assert m == msg_in - def test_import_export_online_all(self, acfactory, tmpdir, lp): + def test_import_export_online_all(self, acfactory, tmpdir, data, lp): ac1 = acfactory.get_one_online_account() lp.sec("create some chat content") - contact1 = ac1.create_contact("some1@example.org", name="some1") - contact1.create_chat().send_text("msg1") + chat1 = ac1.create_contact("some1@example.org", name="some1").create_chat() + chat1.send_text("msg1") assert len(ac1.get_contacts(query="some1")) == 1 + + original_image_path = data.get_path("d.png") + chat1.send_image(original_image_path) + backupdir = tmpdir.mkdir("backup") lp.sec("export all to {}".format(backupdir)) - path = ac1.export_all(backupdir.strpath) - assert os.path.exists(path) + with ac1.temp_plugin(ImexTracker()) as imex_tracker: + path = ac1.export_all(backupdir.strpath) + assert os.path.exists(path) + + # check progress events for export + assert imex_tracker.wait_progress(1, progress_upper_limit=249) + assert imex_tracker.wait_progress(250, progress_upper_limit=499) + assert imex_tracker.wait_progress(500, progress_upper_limit=749) + assert imex_tracker.wait_progress(750, progress_upper_limit=999) + assert imex_tracker.wait_progress(1000) + t = time.time() lp.sec("get fresh empty account") @@ -1359,15 +1373,25 @@ class TestOnlineAccount: assert path2 == path lp.sec("import backup and check it's proper") - ac2.import_all(path) + with ac2.temp_plugin(ImexTracker()) as imex_tracker: + ac2.import_all(path) + + # check progress events for import + assert imex_tracker.wait_progress(1, progress_upper_limit=249) + assert imex_tracker.wait_progress(500, progress_upper_limit=749) + assert imex_tracker.wait_progress(750, progress_upper_limit=999) + assert imex_tracker.wait_progress(1000) + contacts = ac2.get_contacts(query="some1") assert len(contacts) == 1 contact2 = contacts[0] assert contact2.addr == "some1@example.org" chat2 = contact2.create_chat() messages = chat2.get_messages() - assert len(messages) == 1 + assert len(messages) == 2 assert messages[0].text == "msg1" + assert messages[1].filemime == "image/png" + assert os.stat(messages[1].filename).st_size == os.stat(original_image_path).st_size # wait until a second passed since last backup # because get_latest_backupfile() shall return the latest backup diff --git a/src/dc_tools.rs b/src/dc_tools.rs index 7c64e7c9d..7af4a00f7 100644 --- a/src/dc_tools.rs +++ b/src/dc_tools.rs @@ -537,29 +537,9 @@ pub fn dc_open_file_std>( } } -pub(crate) async fn get_next_backup_path_old( - folder: impl AsRef, - backup_time: i64, -) -> Result { - let folder = PathBuf::from(folder.as_ref()); - let stem = chrono::NaiveDateTime::from_timestamp(backup_time, 0) - .format("delta-chat-%Y-%m-%d") - .to_string(); - - // 64 backup files per day should be enough for everyone - for i in 0..64 { - let mut path = folder.clone(); - path.push(format!("{}-{}.bak", stem, i)); - if !path.exists().await { - return Ok(path); - } - } - bail!("could not create backup file, disk full?"); -} - /// Returns Ok((temp_path, dest_path)) on success. The backup can then be written to temp_path. If the backup succeeded, /// it can be renamed to dest_path. This guarantees that the backup is complete. -pub(crate) async fn get_next_backup_path_new( +pub(crate) async fn get_next_backup_path( folder: impl AsRef, backup_time: i64, ) -> Result<(PathBuf, PathBuf), Error> { diff --git a/src/imex.rs b/src/imex.rs index ba3bde24e..a978388df 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -1,10 +1,7 @@ //! # Import/export module use std::any::Any; -use std::{ - cmp::{max, min}, - ffi::OsStr, -}; +use std::ffi::OsStr; use anyhow::Context as _; use async_std::path::{Path, PathBuf}; @@ -456,9 +453,7 @@ async fn imex_inner(context: &Context, what: ImexMode, path: impl AsRef) - ImexMode::ExportSelfKeys => export_self_keys(context, path).await, ImexMode::ImportSelfKeys => import_self_keys(context, path).await, - // TODO In some months we can change the export_backup_old() call to export_backup() and delete export_backup_old(). - // (now is 07/2020) - ImexMode::ExportBackup => export_backup_old(context, path).await, + ImexMode::ExportBackup => export_backup(context, path).await, // import_backup() will call import_backup_old() if this is an old backup. ImexMode::ImportBackup => import_backup(context, path).await, } @@ -494,10 +489,20 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef) -> ); let backup_file = File::open(backup_to_import).await?; + let file_size = backup_file.metadata().await?.len(); let archive = Archive::new(backup_file); + let mut entries = archive.entries()?; while let Some(file) = entries.next().await { let f = &mut file?; + + let current_pos = f.raw_file_position(); + let progress = 1000 * current_pos / file_size; + if progress > 10 && progress < 1000 { + // We already emitted ImexProgress(10) above + context.emit_event(EventType::ImexProgress(progress as usize)); + } + if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) { // async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file. f.unpack_in(context.get_blobdir()).await?; @@ -506,7 +511,6 @@ async fn import_backup(context: &Context, backup_to_import: impl AsRef) -> context.get_dbfile(), ) .await?; - context.emit_event(EventType::ImexProgress(400)); // Just guess the progress, we at least have the dbfile by now } else { // async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards. f.unpack_in(context.get_blobdir()).await?; @@ -642,7 +646,7 @@ async fn import_backup_old(context: &Context, backup_to_import: impl AsRef async fn export_backup(context: &Context, dir: impl AsRef) -> Result<()> { // get a fine backup file name (the name includes the date so that multiple backup instances are possible) let now = time(); - let (temp_path, dest_path) = get_next_backup_path_new(dir, now).await?; + let (temp_path, dest_path) = get_next_backup_path(dir, now).await?; let _d = DeleteOnDrop(temp_path.clone()); context @@ -706,131 +710,37 @@ async fn export_backup_inner(context: &Context, temp_path: &PathBuf) -> Result<( .append_path_with_name(context.get_dbfile(), DBFILE_BACKUP_NAME) .await?; - context.emit_event(EventType::ImexProgress(500)); + let read_dir: Vec<_> = fs::read_dir(context.get_blobdir()).await?.collect().await; + let count = read_dir.len(); + let mut written_files = 0; - builder - .append_dir_all(BLOBS_BACKUP_NAME, context.get_blobdir()) - .await?; + for entry in read_dir.into_iter() { + let entry = entry?; + let name = entry.file_name(); + if !entry.file_type().await?.is_file() { + warn!( + context, + "Export: Found dir entry {} that is not a file, ignoring", + name.to_string_lossy() + ); + continue; + } + let mut file = File::open(entry.path()).await?; + let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(name); + builder.append_file(path_in_archive, &mut file).await?; + + written_files += 1; + let progress = 1000 * written_files / count; + if progress > 10 && progress < 1000 { + // We already emitted ImexProgress(10) above + emit_event!(context, EventType::ImexProgress(progress)); + } + } builder.finish().await?; Ok(()) } -async fn export_backup_old(context: &Context, dir: impl AsRef) -> Result<()> { - // get a fine backup file name (the name includes the date so that multiple backup instances are possible) - // FIXME: we should write to a temporary file first and rename it on success. this would guarantee the backup is complete. - // let dest_path_filename = dc_get_next_backup_file(context, dir, res); - let now = time(); - let dest_path_filename = get_next_backup_path_old(dir, now).await?; - let dest_path_string = dest_path_filename.to_string_lossy().to_string(); - - sql::housekeeping(context).await; - - context.sql.execute("VACUUM;", paramsv![]).await.ok(); - - // we close the database during the copy of the dbfile - context.sql.close().await; - info!( - context, - "Backup '{}' to '{}'.", - context.get_dbfile().display(), - dest_path_filename.display(), - ); - let copied = dc_copy_file(context, context.get_dbfile(), &dest_path_filename).await; - context - .sql - .open(&context, &context.get_dbfile(), false) - .await?; - - if !copied { - bail!( - "could not copy file from '{}' to '{}'", - context.get_dbfile().display(), - dest_path_string - ); - } - let dest_sql = Sql::new(); - dest_sql - .open(context, &dest_path_filename, false) - .await - .with_context(|| format!("could not open exported database {}", dest_path_string))?; - - let res = match add_files_to_export(context, &dest_sql).await { - Err(err) => { - dc_delete_file(context, &dest_path_filename).await; - error!(context, "backup failed: {}", err); - Err(err) - } - Ok(()) => { - dest_sql - .set_raw_config_int(context, "backup_time", now as i32) - .await?; - context.emit_event(EventType::ImexFileWritten(dest_path_filename)); - Ok(()) - } - }; - dest_sql.close().await; - - Ok(res?) -} - -async fn add_files_to_export(context: &Context, sql: &Sql) -> Result<()> { - // add all files as blobs to the database copy (this does not require - // the source to be locked, neigher the destination as it is used only here) - if !sql.table_exists("backup_blobs").await? { - sql.execute( - "CREATE TABLE backup_blobs (id INTEGER PRIMARY KEY, file_name, file_content);", - paramsv![], - ) - .await?; - } - // copy all files from BLOBDIR into backup-db - let mut total_files_cnt = 0; - let dir = context.get_blobdir(); - let dir_handle = async_std::fs::read_dir(&dir).await?; - total_files_cnt += dir_handle.filter(|r| r.is_ok()).count().await; - - info!(context, "EXPORT: total_files_cnt={}", total_files_cnt); - - sql.with_conn_async(|conn| async move { - // scan directory, pass 2: copy files - let mut dir_handle = async_std::fs::read_dir(&dir).await?; - - let mut processed_files_cnt = 0; - while let Some(entry) = dir_handle.next().await { - let entry = entry?; - if context.shall_stop_ongoing().await { - return Ok(()); - } - processed_files_cnt += 1; - let permille = max(min(processed_files_cnt * 1000 / total_files_cnt, 990), 10); - context.emit_event(EventType::ImexProgress(permille)); - - let name_f = entry.file_name(); - let name = name_f.to_string_lossy(); - if name.starts_with("delta-chat") && name.ends_with(".bak") { - continue; - } - info!(context, "EXPORT: copying filename={}", name); - let curr_path_filename = context.get_blobdir().join(entry.file_name()); - if let Ok(buf) = dc_read_file(context, &curr_path_filename).await { - if buf.is_empty() { - continue; - } - // bail out if we can't insert - let mut stmt = conn.prepare_cached( - "INSERT INTO backup_blobs (file_name, file_content) VALUES (?, ?);", - )?; - stmt.execute(paramsv![name, buf])?; - } - } - Ok(()) - }) - .await?; - - Ok(()) -} - /******************************************************************************* * Classic key import ******************************************************************************/