mirror of
https://github.com/chatmail/core.git
synced 2026-04-19 14:36:29 +03:00
Remove old iroh 0.4
This commit is contained in:
@@ -31,36 +31,24 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use futures_lite::StreamExt;
|
||||
use anyhow::{bail, Context as _, Result};
|
||||
use iroh_net::relay::RelayMode;
|
||||
use iroh_net::Endpoint;
|
||||
use iroh_old;
|
||||
use iroh_old::blobs::Collection;
|
||||
use iroh_old::get::DataStream;
|
||||
use iroh_old::progress::ProgressEmitter;
|
||||
use iroh_old::provider::Ticket;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio::fs;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::chat::{add_device_msg, delete_and_reset_all_device_msgs};
|
||||
use crate::chat::add_device_msg;
|
||||
use crate::context::Context;
|
||||
use crate::imex::BlobDirContents;
|
||||
use crate::message::{Message, Viewtype};
|
||||
use crate::qr::{self, Qr};
|
||||
use crate::qr::Qr;
|
||||
use crate::stock_str::backup_transfer_msg_body;
|
||||
use crate::tools::{create_id, time, TempPathGuard};
|
||||
use crate::EventType;
|
||||
|
||||
use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
|
||||
|
||||
const MAX_CONCURRENT_DIALS: u8 = 16;
|
||||
|
||||
/// ALPN protocol identifier for the backup transfer protocol.
|
||||
const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
|
||||
|
||||
@@ -279,33 +267,6 @@ impl Future for BackupProvider {
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieves backup from a legacy backup provider using iroh 0.4.
|
||||
pub async fn get_legacy_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
ensure!(
|
||||
matches!(qr, Qr::Backup { .. }),
|
||||
"QR code for backup must be of type DCBACKUP"
|
||||
);
|
||||
ensure!(
|
||||
!context.is_configured().await?,
|
||||
"Cannot import backups to accounts in use."
|
||||
);
|
||||
// Acquire global "ongoing" mutex.
|
||||
let cancel_token = context.alloc_ongoing().await?;
|
||||
let _guard = context.scheduler.pause(context.clone()).await;
|
||||
info!(
|
||||
context,
|
||||
"Running get_backup for {}",
|
||||
qr::format_backup(&qr)?
|
||||
);
|
||||
let res = tokio::select! {
|
||||
biased;
|
||||
res = get_backup_inner(context, qr) => res,
|
||||
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
||||
};
|
||||
context.free_ongoing().await;
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn get_backup2(
|
||||
context: &Context,
|
||||
node_addr: iroh_net::NodeAddr,
|
||||
@@ -349,202 +310,20 @@ pub async fn get_backup2(
|
||||
///
|
||||
/// This is a long running operation which will return only when completed.
|
||||
///
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts specific variants of it. It
|
||||
/// does avoid having [`iroh_old::provider::Ticket`] in the primary API however, without
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it. It
|
||||
/// does avoid having [`iroh_net::NodeAddr`] in the primary API however, without
|
||||
/// having to revert to untyped bytes.
|
||||
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
match qr {
|
||||
Qr::Backup { .. } => get_legacy_backup(context, qr).await?,
|
||||
Qr::Backup2 {
|
||||
node_addr,
|
||||
auth_token,
|
||||
} => get_backup2(context, node_addr, auth_token).await?,
|
||||
_ => bail!("QR code for backup must be of type DCBACKUP or DCBACKUP2"),
|
||||
_ => bail!("QR code for backup must be of type DCBACKUP2"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
|
||||
let ticket = match qr {
|
||||
Qr::Backup { ticket } => ticket,
|
||||
_ => bail!("QR code for backup must be of type DCBACKUP"),
|
||||
};
|
||||
|
||||
match transfer_from_provider(context, &ticket).await {
|
||||
Ok(()) => {
|
||||
context.sql.run_migrations(context).await?;
|
||||
delete_and_reset_all_device_msgs(context).await?;
|
||||
context.emit_event(ReceiveProgress::Completed.into());
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
// Clean up any blobs we already wrote.
|
||||
let readdir = fs::read_dir(context.get_blobdir()).await?;
|
||||
let mut readdir = ReadDirStream::new(readdir);
|
||||
while let Some(dirent) = readdir.next().await {
|
||||
if let Ok(dirent) = dirent {
|
||||
fs::remove_file(dirent.path()).await.ok();
|
||||
}
|
||||
}
|
||||
context.emit_event(ReceiveProgress::Failed.into());
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> {
|
||||
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
|
||||
spawn_progress_proxy(context.clone(), progress.subscribe());
|
||||
let on_connected = || {
|
||||
context.emit_event(ReceiveProgress::Connected.into());
|
||||
async { Ok(()) }
|
||||
};
|
||||
let on_collection = |collection: &Collection| {
|
||||
context.emit_event(ReceiveProgress::CollectionReceived.into());
|
||||
progress.set_total(collection.total_blobs_size());
|
||||
async { Ok(()) }
|
||||
};
|
||||
let jobs = Mutex::new(JoinSet::default());
|
||||
let on_blob =
|
||||
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
|
||||
|
||||
// Perform the transfer.
|
||||
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
|
||||
let stats = iroh_old::get::run_ticket(
|
||||
ticket,
|
||||
keylog,
|
||||
MAX_CONCURRENT_DIALS,
|
||||
on_connected,
|
||||
on_collection,
|
||||
on_blob,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut jobs = jobs.lock().await;
|
||||
while let Some(job) = jobs.join_next().await {
|
||||
job.context("job failed")?;
|
||||
}
|
||||
drop(progress);
|
||||
info!(
|
||||
context,
|
||||
"Backup transfer finished, transfer rate was {} Mbps.",
|
||||
stats.mbits()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get callback when a blob is received from the provider.
|
||||
///
|
||||
/// This writes the blobs to the blobdir. If the blob is the database it will import it to
|
||||
/// the database of the current [`Context`].
|
||||
async fn on_blob(
|
||||
context: &Context,
|
||||
progress: &ProgressEmitter,
|
||||
jobs: &Mutex<JoinSet<()>>,
|
||||
ticket: &Ticket,
|
||||
_hash: iroh_old::Hash,
|
||||
mut reader: DataStream,
|
||||
name: String,
|
||||
) -> Result<DataStream> {
|
||||
ensure!(!name.is_empty(), "Received a nameless blob");
|
||||
let path = if name.starts_with("db/") {
|
||||
let context_dir = context
|
||||
.get_blobdir()
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow!("Context dir not found"))?;
|
||||
let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
|
||||
if fs::metadata(&dbfile).await.is_ok() {
|
||||
fs::remove_file(&dbfile).await?;
|
||||
warn!(context, "Previous database export deleted");
|
||||
}
|
||||
dbfile
|
||||
} else {
|
||||
ensure!(name.starts_with("blob/"), "malformatted blob name");
|
||||
let blobname = name.rsplit('/').next().context("malformatted blob name")?;
|
||||
context.get_blobdir().join(blobname)
|
||||
};
|
||||
|
||||
let mut wrapped_reader = progress.wrap_async_read(&mut reader);
|
||||
let file = File::create(&path).await?;
|
||||
let mut file = BufWriter::with_capacity(128 * 1024, file);
|
||||
io::copy(&mut wrapped_reader, &mut file).await?;
|
||||
file.flush().await?;
|
||||
|
||||
if name.starts_with("db/") {
|
||||
let context = context.clone();
|
||||
let token = ticket.token().to_string();
|
||||
jobs.lock().await.spawn(async move {
|
||||
if let Err(err) = context.sql.import(&path, token).await {
|
||||
error!(context, "cannot import database: {:#?}", err);
|
||||
}
|
||||
if let Err(err) = fs::remove_file(&path).await {
|
||||
error!(
|
||||
context,
|
||||
"failed to delete database import file '{}': {:#?}",
|
||||
path.display(),
|
||||
err,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
/// Spawns a task proxying progress events.
|
||||
///
|
||||
/// This spawns a tokio task which receives events from the [`ProgressEmitter`] and sends
|
||||
/// them to the context. The task finishes when the emitter is dropped.
|
||||
///
|
||||
/// This could be done directly in the emitter by making it less generic.
|
||||
fn spawn_progress_proxy(context: Context, mut rx: broadcast::Receiver<u16>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(step) => context.emit_event(ReceiveProgress::BlobProgress(step).into()),
|
||||
Err(RecvError::Closed) => break,
|
||||
Err(RecvError::Lagged(_)) => continue,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Create [`EventType::ImexProgress`] events using readable names.
|
||||
///
|
||||
/// Plus you get warnings if you don't use all variants.
|
||||
#[derive(Debug)]
|
||||
enum ReceiveProgress {
|
||||
Connected,
|
||||
CollectionReceived,
|
||||
/// A value between 0 and 85 interpreted as a percentage.
|
||||
///
|
||||
/// Other values are already used by the other variants of this enum.
|
||||
BlobProgress(u16),
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
impl ReceiveProgress {
|
||||
/// The maximum value for [`ReceiveProgress::BlobProgress`].
|
||||
///
|
||||
/// This only exists to keep this magic value local in this type.
|
||||
fn max_blob_progress() -> u16 {
|
||||
85
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ReceiveProgress> for EventType {
|
||||
fn from(source: ReceiveProgress) -> Self {
|
||||
let val = match source {
|
||||
ReceiveProgress::Connected => 50,
|
||||
ReceiveProgress::CollectionReceived => 100,
|
||||
ReceiveProgress::BlobProgress(val) => 100 + 10 * val,
|
||||
ReceiveProgress::Completed => 1000,
|
||||
ReceiveProgress::Failed => 0,
|
||||
};
|
||||
EventType::ImexProgress(val.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
Reference in New Issue
Block a user