mirror of
https://github.com/chatmail/core.git
synced 2026-04-26 01:46:34 +03:00
implement receive-backup
This commit is contained in:
96
src/imex.rs
96
src/imex.rs
@@ -10,6 +10,7 @@ use futures::StreamExt;
|
||||
use futures_lite::FutureExt;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_tar::Archive;
|
||||
|
||||
use crate::blob::BlobObject;
|
||||
@@ -106,6 +107,101 @@ pub async fn imex(
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn receive_backup(
|
||||
context: &Context,
|
||||
ticket_bytes: Vec<u8>,
|
||||
passphrase: Option<String>,
|
||||
) -> Result<()> {
|
||||
let cancel = context.alloc_ongoing().await?;
|
||||
|
||||
let res = receive_backup_inner(context, ticket_bytes, passphrase.unwrap_or_default())
|
||||
.race(async {
|
||||
cancel.recv().await.ok();
|
||||
Err(format_err!("canceled"))
|
||||
})
|
||||
.await;
|
||||
|
||||
context.free_ongoing().await;
|
||||
|
||||
if let Err(err) = res.as_ref() {
|
||||
// We are using Anyhow's .context() and to show the inner error, too, we need the {:#}:
|
||||
error!(context, "IMEX failed to complete: {:#}", err);
|
||||
context.emit_event(EventType::ImexProgress(0));
|
||||
} else {
|
||||
info!(context, "IMEX successfully completed");
|
||||
context.emit_event(EventType::ImexProgress(1000));
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn receive_backup_inner(
|
||||
context: &Context,
|
||||
ticket_bytes: Vec<u8>,
|
||||
passphrase: String,
|
||||
) -> Result<()> {
|
||||
use iroh_share::{Receiver, Ticket};
|
||||
ensure!(
|
||||
!context.is_configured().await?,
|
||||
"Cannot import backups to accounts in use."
|
||||
);
|
||||
ensure!(
|
||||
context.scheduler.read().await.is_none(),
|
||||
"cannot import backup, IO is running"
|
||||
);
|
||||
let ticket = Ticket::from_bytes(&ticket_bytes)?;
|
||||
|
||||
let sender_dir = tempfile::tempdir().unwrap();
|
||||
let sender_db = sender_dir.path().join("db");
|
||||
|
||||
let port = 9991;
|
||||
let rpc_p2p_port = 5551;
|
||||
let rpc_store_port = 5561;
|
||||
let receiver = Receiver::new(port, rpc_p2p_port, rpc_store_port, &sender_db)
|
||||
.await
|
||||
.context("failed to create sender")?;
|
||||
let receiver_transfer = receiver
|
||||
.transfer_from_ticket(&ticket)
|
||||
.await
|
||||
.context("failed to read transfer")?;
|
||||
let data = receiver_transfer.recv().await?;
|
||||
|
||||
let out = context.get_blobdir();
|
||||
|
||||
for link in data.read_dir().unwrap() {
|
||||
let link = link?;
|
||||
let file_content = data.read_file(&link).await?;
|
||||
let name = link.name.unwrap_or_default();
|
||||
let path = out.join(&name);
|
||||
println!("Writing {}", path.display());
|
||||
let mut file = tokio::fs::File::create(&path)
|
||||
.await
|
||||
.with_context(|| format!("create file: {}", path.display()))?;
|
||||
let mut content = file_content.pretty();
|
||||
tokio::io::copy(&mut content, &mut file)
|
||||
.await
|
||||
.context("copy")?;
|
||||
file.flush().await?;
|
||||
|
||||
if name == DBFILE_BACKUP_NAME {
|
||||
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
|
||||
context
|
||||
.sql
|
||||
.import(&unpacked_database, passphrase.clone())
|
||||
.await
|
||||
.context("cannot import unpacked database")?;
|
||||
fs::remove_file(unpacked_database)
|
||||
.await
|
||||
.context("cannot remove unpacked database")?;
|
||||
} else {
|
||||
// nothing to do, unpacked directly into the blobs dir
|
||||
}
|
||||
}
|
||||
|
||||
println!("Received all data, written to: {}", out.display());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_backup(
|
||||
context: &Context,
|
||||
path: &Path,
|
||||
|
||||
Reference in New Issue
Block a user