mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
1 Commits
v2.23.0
...
link2xt/ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
189a093e02 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1298,6 +1298,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-broadcast",
|
||||
"async-channel 2.3.1",
|
||||
"async-compression",
|
||||
"async-imap",
|
||||
"async-native-tls",
|
||||
"async-smtp",
|
||||
|
||||
@@ -42,6 +42,7 @@ anyhow = { workspace = true }
|
||||
async-broadcast = "0.7.1"
|
||||
async-channel = { workspace = true }
|
||||
async-imap = { version = "0.10.2", default-features = false, features = ["runtime-tokio", "compress"] }
|
||||
async-compression = { version = "0.4.15", default-features = false, features = ["tokio", "gzip"] }
|
||||
async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] }
|
||||
async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] }
|
||||
async_zip = { version = "0.0.17", default-features = false, features = ["deflate", "tokio-fs"] }
|
||||
|
||||
@@ -61,7 +61,7 @@ def test_qr_securejoin(acfactory, protect, tmp_path):
|
||||
# Setup second device for Alice
|
||||
# to test observing securejoin protocol.
|
||||
alice.export_backup(tmp_path)
|
||||
files = list(tmp_path.glob("*.tar"))
|
||||
files = list(tmp_path.glob("*.tar.gz"))
|
||||
alice2 = acfactory.get_unconfigured_account()
|
||||
alice2.import_backup(files[0])
|
||||
|
||||
|
||||
@@ -379,7 +379,7 @@ def test_import_export_backup(acfactory, tmp_path) -> None:
|
||||
alice = acfactory.new_configured_account()
|
||||
alice.export_backup(tmp_path)
|
||||
|
||||
files = list(tmp_path.glob("*.tar"))
|
||||
files = list(tmp_path.glob("*.tar.gz"))
|
||||
alice2 = acfactory.get_unconfigured_account()
|
||||
alice2.import_backup(files[0])
|
||||
|
||||
@@ -630,7 +630,7 @@ def test_markseen_contact_request(acfactory, tmp_path):
|
||||
|
||||
# Bob sets up a second device.
|
||||
bob.export_backup(tmp_path)
|
||||
files = list(tmp_path.glob("*.tar"))
|
||||
files = list(tmp_path.glob("*.tar.gz"))
|
||||
bob2 = acfactory.get_unconfigured_account()
|
||||
bob2.import_backup(files[0])
|
||||
bob2.start_io()
|
||||
|
||||
72
src/imex.rs
72
src/imex.rs
@@ -11,7 +11,7 @@ use futures_lite::FutureExt;
|
||||
use pin_project::pin_project;
|
||||
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
|
||||
use tokio_tar::Archive;
|
||||
|
||||
use crate::blob::BlobDirContents;
|
||||
@@ -123,7 +123,7 @@ pub async fn has_backup(_context: &Context, dir_name: &Path) -> Result<String> {
|
||||
let name = dirent.file_name();
|
||||
let name: String = name.to_string_lossy().into();
|
||||
if name.starts_with("delta-chat")
|
||||
&& name.ends_with(".tar")
|
||||
&& (name.ends_with(".tar") || name.ends_with(".tar.gz"))
|
||||
&& (newest_backup_name.is_empty() || name > newest_backup_name)
|
||||
{
|
||||
// We just use string comparison to determine which backup is newer.
|
||||
@@ -269,30 +269,24 @@ async fn import_backup(
|
||||
context.get_dbfile().display()
|
||||
);
|
||||
|
||||
import_backup_stream(context, backup_file, file_size, passphrase).await?;
|
||||
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
|
||||
if backup_to_import.extension() == Some(OsStr::new("gz")) {
|
||||
let backup_file = tokio::io::BufReader::new(backup_file);
|
||||
let backup_file = async_compression::tokio::bufread::GzipDecoder::new(backup_file);
|
||||
import_backup_stream(context, backup_file, passphrase).await?;
|
||||
} else {
|
||||
import_backup_stream(context, backup_file, passphrase).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Imports backup by reading a tar file from a stream.
|
||||
///
|
||||
/// `file_size` is used to calculate the progress
|
||||
/// and emit progress events.
|
||||
/// Ideally it is the sum of the entry
|
||||
/// sizes without the header overhead,
|
||||
/// but can be estimated as tar file size
|
||||
/// in which case the progress is underestimated
|
||||
/// and may not reach 99.9% by the end of import.
|
||||
/// Underestimating is better than
|
||||
/// overestimating because the progress
|
||||
/// jumps to 100% instead of getting stuck at 99.9%
|
||||
/// for some time.
|
||||
pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
|
||||
context: &Context,
|
||||
backup_file: R,
|
||||
file_size: u64,
|
||||
passphrase: String,
|
||||
) -> Result<()> {
|
||||
import_backup_stream_inner(context, backup_file, file_size, passphrase)
|
||||
import_backup_stream_inner(context, backup_file, passphrase)
|
||||
.await
|
||||
.0
|
||||
}
|
||||
@@ -319,6 +313,19 @@ struct ProgressReader<R> {
|
||||
}
|
||||
|
||||
impl<R> ProgressReader<R> {
|
||||
/// Creates a new `ProgressReader`.
|
||||
///
|
||||
/// `file_size` is used to calculate the progress
|
||||
/// and emit progress events.
|
||||
/// Ideally it is the sum of the entry
|
||||
/// sizes without the header overhead,
|
||||
/// but can be estimated as tar file size
|
||||
/// in which case the progress is underestimated
|
||||
/// and may not reach 99.9% by the end of import.
|
||||
/// Underestimating is better than
|
||||
/// overestimating because the progress
|
||||
/// jumps to 100% instead of getting stuck at 99.9%
|
||||
/// for some time.
|
||||
fn new(r: R, context: Context, file_size: u64) -> Self {
|
||||
Self {
|
||||
inner: r,
|
||||
@@ -358,10 +365,8 @@ where
|
||||
async fn import_backup_stream_inner<R: tokio::io::AsyncRead + Unpin>(
|
||||
context: &Context,
|
||||
backup_file: R,
|
||||
file_size: u64,
|
||||
passphrase: String,
|
||||
) -> (Result<()>,) {
|
||||
let backup_file = ProgressReader::new(backup_file, context.clone(), file_size);
|
||||
let mut archive = Archive::new(backup_file);
|
||||
|
||||
let mut entries = match archive.entries() {
|
||||
@@ -461,10 +466,10 @@ fn get_next_backup_path(
|
||||
tempdbfile.push(format!("{stem}-{i:02}-{addr}.db"));
|
||||
|
||||
let mut tempfile = folder.clone();
|
||||
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.part"));
|
||||
tempfile.push(format!("{stem}-{i:02}-{addr}.tar.gz.part"));
|
||||
|
||||
let mut destfile = folder.clone();
|
||||
destfile.push(format!("{stem}-{i:02}-{addr}.tar"));
|
||||
destfile.push(format!("{stem}-{i:02}-{addr}.tar.gz"));
|
||||
|
||||
if !tempdbfile.exists() && !tempfile.exists() && !destfile.exists() {
|
||||
return Ok((tempdbfile, tempfile, destfile));
|
||||
@@ -504,9 +509,13 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
||||
file_size += blob.to_abs_path().metadata()?.len()
|
||||
}
|
||||
|
||||
export_backup_stream(context, &temp_db_path, blobdir, file, file_size)
|
||||
.await
|
||||
.context("Exporting backup to file failed")?;
|
||||
let gzip_encoder = async_compression::tokio::write::GzipEncoder::new(file);
|
||||
let mut gzip_encoder =
|
||||
export_backup_stream(context, &temp_db_path, blobdir, gzip_encoder, file_size)
|
||||
.await
|
||||
.context("Exporting backup to file failed")?;
|
||||
gzip_encoder.shutdown().await?;
|
||||
|
||||
fs::rename(temp_path, &dest_path).await?;
|
||||
context.emit_event(EventType::ImexFileWritten(dest_path));
|
||||
Ok(())
|
||||
@@ -543,6 +552,10 @@ impl<W> ProgressWriter<W> {
|
||||
context,
|
||||
}
|
||||
}
|
||||
|
||||
fn into_inner(self) -> W {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<W> AsyncWrite for ProgressWriter<W>
|
||||
@@ -590,12 +603,12 @@ pub(crate) async fn export_backup_stream<'a, W>(
|
||||
blobdir: BlobDirContents<'a>,
|
||||
writer: W,
|
||||
file_size: u64,
|
||||
) -> Result<()>
|
||||
) -> Result<W>
|
||||
where
|
||||
W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static,
|
||||
{
|
||||
let writer = ProgressWriter::new(writer, context.clone(), file_size);
|
||||
let mut builder = tokio_tar::Builder::new(writer);
|
||||
let progress_writer = ProgressWriter::new(writer, context.clone(), file_size);
|
||||
let mut builder = tokio_tar::Builder::new(progress_writer);
|
||||
|
||||
builder
|
||||
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
|
||||
@@ -607,8 +620,9 @@ where
|
||||
builder.append_file(path_in_archive, &mut file).await?;
|
||||
}
|
||||
|
||||
builder.finish().await?;
|
||||
Ok(())
|
||||
// Convert tar builder back into the underlying stream.
|
||||
let progress_writer = builder.into_inner().await?;
|
||||
Ok(progress_writer.into_inner())
|
||||
}
|
||||
|
||||
/// Imports secret key from a file.
|
||||
|
||||
@@ -36,12 +36,13 @@ use futures_lite::FutureExt;
|
||||
use iroh_net::relay::RelayMode;
|
||||
use iroh_net::Endpoint;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::chat::add_device_msg;
|
||||
use crate::context::Context;
|
||||
use crate::imex::BlobDirContents;
|
||||
use crate::imex::{BlobDirContents, ProgressReader};
|
||||
use crate::message::{Message, Viewtype};
|
||||
use crate::qr::Qr;
|
||||
use crate::stock_str::backup_transfer_msg_body;
|
||||
@@ -190,9 +191,11 @@ impl BackupProvider {
|
||||
|
||||
send_stream.write_all(&file_size.to_be_bytes()).await?;
|
||||
|
||||
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
|
||||
.await
|
||||
.context("Failed to write backup into QUIC stream")?;
|
||||
let mut send_stream =
|
||||
export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size)
|
||||
.await
|
||||
.context("Failed to write backup into QUIC stream")?;
|
||||
send_stream.shutdown().await?;
|
||||
info!(context, "Finished writing backup into QUIC stream.");
|
||||
let mut buf = [0u8; 1];
|
||||
info!(context, "Waiting for acknowledgment.");
|
||||
@@ -310,7 +313,8 @@ pub async fn get_backup2(
|
||||
let mut file_size_buf = [0u8; 8];
|
||||
recv_stream.read_exact(&mut file_size_buf).await?;
|
||||
let file_size = u64::from_be_bytes(file_size_buf);
|
||||
import_backup_stream(context, recv_stream, file_size, passphrase)
|
||||
let recv_stream = ProgressReader::new(recv_stream, context.clone(), file_size);
|
||||
import_backup_stream(context, recv_stream, passphrase)
|
||||
.await
|
||||
.context("Failed to import backup from QUIC stream")?;
|
||||
info!(context, "Finished importing backup from the stream.");
|
||||
|
||||
Reference in New Issue
Block a user