feat: compress backups with gzip

This commit is contained in:
link2xt
2024-10-24 22:28:14 +00:00
parent fc2b111f5d
commit 189a093e02
6 changed files with 57 additions and 37 deletions

1
Cargo.lock generated
View File

@@ -1298,6 +1298,7 @@ dependencies = [
"anyhow", "anyhow",
"async-broadcast", "async-broadcast",
"async-channel 2.3.1", "async-channel 2.3.1",
"async-compression",
"async-imap", "async-imap",
"async-native-tls", "async-native-tls",
"async-smtp", "async-smtp",

View File

@@ -42,6 +42,7 @@ anyhow = { workspace = true }
async-broadcast = "0.7.1" async-broadcast = "0.7.1"
async-channel = { workspace = true } async-channel = { workspace = true }
async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] } async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] }
async-compression = { version = "0.4.15", default-features = false, features = ["tokio", "gzip"] }
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] } async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] } async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] }

View File

@@ -61,7 +61,7 @@ def test_qr_securejoin(acfactory, protect, tmp_path):
# Setup second device for Alice # Setup second device for Alice
# to test observing securejoin protocol. # to test observing securejoin protocol.
alice.export_backup(tmp_path) alice.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar")) files = list(tmp_path.glob("*.tar.gz"))
alice2 = acfactory.get_unconfigured_account() alice2 = acfactory.get_unconfigured_account()
alice2.import_backup(files[0]) alice2.import_backup(files[0])

View File

@@ -379,7 +379,7 @@ def test_import_export_backup(acfactory, tmp_path) -> None:
alice = acfactory.new_configured_account() alice = acfactory.new_configured_account()
alice.export_backup(tmp_path) alice.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar")) files = list(tmp_path.glob("*.tar.gz"))
alice2 = acfactory.get_unconfigured_account() alice2 = acfactory.get_unconfigured_account()
alice2.import_backup(files[0]) alice2.import_backup(files[0])
@@ -630,7 +630,7 @@ def test_markseen_contact_request(acfactory, tmp_path):
# Bob sets up a second device. # Bob sets up a second device.
bob.export_backup(tmp_path) bob.export_backup(tmp_path)
files = list(tmp_path.glob("*.tar")) files = list(tmp_path.glob("*.tar.gz"))
bob2 = acfactory.get_unconfigured_account() bob2 = acfactory.get_unconfigured_account()
bob2.import_backup(files[0]) bob2.import_backup(files[0])
bob2.start_io() bob2.start_io()

View File

@@ -11,7 +11,7 @@ use futures_lite::FutureExt;
use pin_project::pin_project; use pin_project::pin_project;
use tokio::fs::{self, File}; use tokio::fs::{self, File};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_tar::Archive; use tokio_tar::Archive;
use crate::blob::BlobDirContents; use crate::blob::BlobDirContents;
@@ -123,7 +123,7 @@ pub async fn has_backup(_context: &Context, dir_name: &Path) -> Result<String> {
let name = dirent.file_name(); let name = dirent.file_name();
let name: String = name.to_string_lossy().into(); let name: String = name.to_string_lossy().into();
if name.starts_with("delta-chat") if name.starts_with("delta-chat")
&& name.ends_with(".tar") && (name.ends_with(".tar") || name.ends_with(".tar.gz"))
&& (newest_backup_name.is_empty() || name > newest_backup_name) && (newest_backup_name.is_empty() || name > newest_backup_name)
{ {
// We just use string comparison to determine which backup is newer. // We just use string comparison to determine which backup is newer.
@@ -269,30 +269,24 @@ async fn import_backup(
context.get_dbfile().display() context.get_dbfile().display()
); );
import_backup_stream(context, backup_file, file_size, passphrase).await?; let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
if backup_to_import.extension() == Some(OsStr::new("gz")) {
let backup_file = tokio::io::BufReader::new(backup_file);
let backup_file = async_compression::tokio::bufread::GzipDecoder::new(backup_file);
import_backup_stream(context, backup_file, passphrase).await?;
} else {
import_backup_stream(context, backup_file, passphrase).await?;
}
Ok(()) Ok(())
} }
/// Imports backup by reading a tar file from a stream. /// Imports backup by reading a tar file from a stream.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>( pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
context: &Context, context: &Context,
backup_file: R, backup_file: R,
file_size: u64,
passphrase: String, passphrase: String,
) -> Result<()> { ) -> Result<()> {
import_backup_stream_inner(context, backup_file, file_size, passphrase) import_backup_stream_inner(context, backup_file, passphrase)
.await .await
.0 .0
} }
@@ -319,6 +313,19 @@ struct ProgressReader<R> {
} }
impl<R> ProgressReader<R> { impl<R> ProgressReader<R> {
/// Creates a new `ProgressReader`.
///
/// `file_size` is used to calculate the progress
/// and emit progress events.
/// Ideally it is the sum of the entry
/// sizes without the header overhead,
/// but can be estimated as tar file size
/// in which case the progress is underestimated
/// and may not reach 99.9% by the end of import.
/// Underestimating is better than
/// overestimating because the progress
/// jumps to 100% instead of getting stuck at 99.9%
/// for some time.
fn new(r: R, context: Context, file_size: u64) -> Self { fn new(r: R, context: Context, file_size: u64) -> Self {
Self { Self {
inner: r, inner: r,
@@ -358,10 +365,8 @@ where
async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>( async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
context: &Context, context: &Context,
backup_file: R, backup_file: R,
file_size: u64,
passphrase: String, passphrase: String,
) -> (Result<()>,) { ) -> (Result<()>,) {
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
let mut archive = Archive::new(backup_file); let mut archive = Archive::new(backup_file);
let mut entries = match archive.entries() { let mut entries = match archive.entries() {
@@ -461,10 +466,10 @@ fn get_next_backup_path(
tempdbfile.push(format!("{stem}-{i:02}-{addr}.db")); tempdbfile.push(format!("{stem}-{i:02}-{addr}.db"));
let mut tempfile = folder.clone(); let mut tempfile = folder.clone();
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.part")); tempfile.push(format!("{stem}-{i:02}-{addr}.tar.gz.part"));
let mut destfile = folder.clone(); let mut destfile = folder.clone();
destfile.push(format!("{stem}-{i:02}-{addr}.tar")); destfile.push(format!("{stem}-{i:02}-{addr}.tar.gz"));
if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() { if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() {
return Ok((tempdbfile, tempfile, destfile)); return Ok((tempdbfile, tempfile, destfile));
@@ -504,9 +509,13 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
file_size += blob.to_abs_path().metadata()?.len() file_size += blob.to_abs_path().metadata()?.len()
} }
export_backup_stream(context, &temp_db_path, blobdir, file, file_size) let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file);
.await let mut gzip_encoder =
.context("Exporting backup to file failed")?; export_backup_stream(context, &temp_db_path, blobdir, gzip_encoder, file_size)
.await
.context("Exporting backup to file failed")?;
gzip_encoder.shutdown().await?;
fs::rename(temp_path, &dest_path).await?; fs::rename(temp_path, &dest_path).await?;
context.emit_event(EventType::ImexFileWritten(dest_path)); context.emit_event(EventType::ImexFileWritten(dest_path));
Ok(()) Ok(())
@@ -543,6 +552,10 @@ impl<W> ProgressWriter<W> {
context, context,
} }
} }
fn into_inner(self) -> W {
self.inner
}
} }
impl<W> AsyncWrite for ProgressWriter<W> impl<W> AsyncWrite for ProgressWriter<W>
@@ -590,12 +603,12 @@ pub(crate) async fn export_backup_stream<'a, W>(
blobdir: BlobDirContents<'a>, blobdir: BlobDirContents<'a>,
writer: W, writer: W,
file_size: u64, file_size: u64,
) -> Result<()> ) -> Result<W>
where where
W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static, W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static,
{ {
let writer = ProgressWriter::new(writer, context.clone(), file_size); let progress_writer = ProgressWriter::new(writer, context.clone(), file_size);
let mut builder = tokio_tar::Builder::new(writer); let mut builder = tokio_tar::Builder::new(progress_writer);
builder builder
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME) .append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
@@ -607,8 +620,9 @@ where
builder.append_file(path_in_archive, &mut file).await?; builder.append_file(path_in_archive, &mut file).await?;
} }
builder.finish().await?; // Convert tar builder back into the underlying stream.
Ok(()) let progress_writer = builder.into_inner().await?;
Ok(progress_writer.into_inner())
} }
/// Imports secret key from a file. /// Imports secret key from a file.

View File

@@ -36,12 +36,13 @@ use futures_lite::FutureExt;
use iroh_net::relay::RelayMode; use iroh_net::relay::RelayMode;
use iroh_net::Endpoint; use iroh_net::Endpoint;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::chat::add_device_msg; use crate::chat::add_device_msg;
use crate::context::Context; use crate::context::Context;
use crate::imex::BlobDirContents; use crate::imex::{BlobDirContents, ProgressReader};
use crate::message::{Message, Viewtype}; use crate::message::{Message, Viewtype};
use crate::qr::Qr; use crate::qr::Qr;
use crate::stock_str::backup_transfer_msg_body; use crate::stock_str::backup_transfer_msg_body;
@@ -190,9 +191,11 @@ impl BackupProvider {
send_stream.write_all(&file_size.to_be_bytes()).await?; send_stream.write_all(&file_size.to_be_bytes()).await?;
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size) let mut send_stream =
.await export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
.context("Failed to write backup into QUIC stream")?; .await
.context("Failed to write backup into QUIC stream")?;
send_stream.shutdown().await?;
info!(context, "Finished writing backup into QUIC stream."); info!(context, "Finished writing backup into QUIC stream.");
let mut buf = [0u8; 1]; let mut buf = [0u8; 1];
info!(context, "Waiting for acknowledgment."); info!(context, "Waiting for acknowledgment.");
@@ -310,7 +313,8 @@ pub async fn get_backup2(
let mut file_size_buf = [0u8; 8]; let mut file_size_buf = [0u8; 8];
recv_stream.read_exact(&mut file_size_buf).await?; recv_stream.read_exact(&mut file_size_buf).await?;
let file_size = u64::from_be_bytes(file_size_buf); let file_size = u64::from_be_bytes(file_size_buf);
import_backup_stream(context, recv_stream, file_size, passphrase) let recv_stream = ProgressReader::new(recv_stream, context.clone(), file_size);
import_backup_stream(context, recv_stream, passphrase)
.await .await
.context("Failed to import backup from QUIC stream")?; .context("Failed to import backup from QUIC stream")?;
info!(context, "Finished importing backup from the stream."); info!(context, "Finished importing backup from the stream.");