mirror of
https://github.com/chatmail/core.git
synced 2026-05-05 22:36:30 +03:00
fix: imex::import_backup: Unpack all blobs before importing a db (#4307)
This way we can't get an account with missing blobs if there's not enough disk space. Also delete already unpacked files if all files weren't unpacked successfully. Still, there are some minor problems remaining: - If a db wasn't imported successfully, unpacked blobs aren't deleted because we don't know at which step the import failed and whether the db will reference the blobs after restart. - If `delete_and_reset_all_device_msgs()` fails, the whole `import_backup()` fails also, but after a restart delete_and_reset_all_device_msgs() isn't retried. Probably errors from it should be ignored at all.
This commit is contained in:
105
src/imex.rs
105
src/imex.rs
@@ -292,62 +292,99 @@ pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
|
|||||||
file_size: u64,
|
file_size: u64,
|
||||||
passphrase: String,
|
passphrase: String,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
import_backup_stream_inner(context, backup_file, file_size, passphrase)
|
||||||
|
.await
|
||||||
|
.0
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
|
||||||
|
context: &Context,
|
||||||
|
backup_file: R,
|
||||||
|
file_size: u64,
|
||||||
|
passphrase: String,
|
||||||
|
) -> (Result<()>,) {
|
||||||
let mut archive = Archive::new(backup_file);
|
let mut archive = Archive::new(backup_file);
|
||||||
|
|
||||||
let mut entries = archive.entries()?;
|
let mut entries = match archive.entries() {
|
||||||
|
Ok(entries) => entries,
|
||||||
|
Err(e) => return (Err(e).context("Failed to get archive entries"),),
|
||||||
|
};
|
||||||
|
let mut blobs = Vec::new();
|
||||||
// We already emitted ImexProgress(10) above
|
// We already emitted ImexProgress(10) above
|
||||||
let mut last_progress = 10;
|
let mut last_progress = 10;
|
||||||
|
const PROGRESS_MIGRATIONS: u64 = 999;
|
||||||
let mut total_size = 0;
|
let mut total_size = 0;
|
||||||
while let Some(mut f) = entries
|
let mut res: Result<()> = loop {
|
||||||
.try_next()
|
let mut f = match entries.try_next().await {
|
||||||
.await
|
Ok(Some(f)) => f,
|
||||||
.context("Failed to get next entry")?
|
Ok(None) => break Ok(()),
|
||||||
{
|
Err(e) => break Err(e).context("Failed to get next entry"),
|
||||||
total_size += f.header().entry_size()?;
|
};
|
||||||
|
total_size += match f.header().entry_size() {
|
||||||
|
Ok(size) => size,
|
||||||
|
Err(e) => break Err(e).context("Failed to get entry size"),
|
||||||
|
};
|
||||||
let progress = std::cmp::min(
|
let progress = std::cmp::min(
|
||||||
1000 * total_size.checked_div(file_size).unwrap_or_default(),
|
1000 * total_size.checked_div(file_size).unwrap_or_default(),
|
||||||
999,
|
PROGRESS_MIGRATIONS - 1,
|
||||||
);
|
);
|
||||||
if progress > last_progress {
|
if progress > last_progress {
|
||||||
context.emit_event(EventType::ImexProgress(progress as usize));
|
context.emit_event(EventType::ImexProgress(progress as usize));
|
||||||
last_progress = progress;
|
last_progress = progress;
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
|
let path = match f.path() {
|
||||||
// async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file.
|
Ok(path) => path.to_path_buf(),
|
||||||
f.unpack_in(context.get_blobdir())
|
Err(e) => break Err(e).context("Failed to get entry path"),
|
||||||
.await
|
};
|
||||||
.context("Failed to unpack database")?;
|
if let Err(e) = f.unpack_in(context.get_blobdir()).await {
|
||||||
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
|
break Err(e).context("Failed to unpack file");
|
||||||
context
|
}
|
||||||
.sql
|
if path.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
|
||||||
.import(&unpacked_database, passphrase.clone())
|
continue;
|
||||||
.await
|
}
|
||||||
.context("cannot import unpacked database")?;
|
// async_tar unpacked to $BLOBDIR/BLOBS_BACKUP_NAME/, so we move the file afterwards.
|
||||||
fs::remove_file(unpacked_database)
|
let from_path = context.get_blobdir().join(&path);
|
||||||
.await
|
|
||||||
.context("cannot remove unpacked database")?;
|
|
||||||
} else {
|
|
||||||
// async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards.
|
|
||||||
f.unpack_in(context.get_blobdir())
|
|
||||||
.await
|
|
||||||
.context("Failed to unpack blob")?;
|
|
||||||
let from_path = context.get_blobdir().join(f.path()?);
|
|
||||||
if from_path.is_file() {
|
if from_path.is_file() {
|
||||||
if let Some(name) = from_path.file_name() {
|
if let Some(name) = from_path.file_name() {
|
||||||
fs::rename(&from_path, context.get_blobdir().join(name)).await?;
|
let to_path = context.get_blobdir().join(name);
|
||||||
|
if let Err(e) = fs::rename(&from_path, &to_path).await {
|
||||||
|
blobs.push(from_path);
|
||||||
|
break Err(e).context("Failed to move file to blobdir");
|
||||||
|
}
|
||||||
|
blobs.push(to_path);
|
||||||
} else {
|
} else {
|
||||||
warn!(context, "No file name");
|
warn!(context, "No file name");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
if res.is_err() {
|
||||||
|
for blob in blobs {
|
||||||
|
fs::remove_file(&blob).await.log_err(context).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
context.sql.run_migrations(context).await?;
|
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
|
||||||
delete_and_reset_all_device_msgs(context).await?;
|
if res.is_ok() {
|
||||||
|
res = context
|
||||||
Ok(())
|
.sql
|
||||||
|
.import(&unpacked_database, passphrase.clone())
|
||||||
|
.await
|
||||||
|
.context("cannot import unpacked database");
|
||||||
|
}
|
||||||
|
fs::remove_file(unpacked_database)
|
||||||
|
.await
|
||||||
|
.context("cannot remove unpacked database")
|
||||||
|
.log_err(context)
|
||||||
|
.ok();
|
||||||
|
if res.is_ok() {
|
||||||
|
context.emit_event(EventType::ImexProgress(PROGRESS_MIGRATIONS as usize));
|
||||||
|
res = context.sql.run_migrations(context).await;
|
||||||
|
}
|
||||||
|
if res.is_ok() {
|
||||||
|
res = delete_and_reset_all_device_msgs(context).await;
|
||||||
|
}
|
||||||
|
(res,)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
|
|||||||
Reference in New Issue
Block a user