From db8c3352ee5022e51ab12da97df8ab357a6d81cf Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 14 Jul 2023 16:29:31 +0200 Subject: [PATCH] get compile --- Cargo.lock | 1 + Cargo.toml | 1 + src/imex/transfer.rs | 33 ++++++++++++++------------------- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3e4f06e8..d8c1b4cfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1242,6 +1242,7 @@ dependencies = [ "humansize", "image", "iroh", + "iroh-io", "kamadak-exif", "lettre_email", "libc", diff --git a/Cargo.toml b/Cargo.toml index 925819336..d3ce22db2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ hex = "0.4.0" humansize = "2" image = { version = "0.24.6", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] } iroh = { version = "0.4.1", default-features = false, git = "https://github.com/n0-computer/iroh", branch = "main", features = ["iroh-collection", "flat-db"] } +iroh-io = "0.2.1" kamadak-exif = "0.5" lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" } libc = "0.2" diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 3068cbc90..fb550da79 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -46,10 +46,12 @@ use iroh::net::defaults::default_derp_map; use iroh::net::tls::Keypair; use iroh::node::{Event, Node as IrohNode, StaticTokenAuthHandler}; use iroh::util::progress::ProgressEmitter; -use tokio::fs::{self, File}; -use tokio::io::AsyncWriteExt; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::{broadcast, Mutex}; +use iroh_io::AsyncSliceWriter; +use tokio::fs; +use tokio::sync::{ + broadcast::{self, error::RecvError}, + Mutex, +}; use tokio::task::{JoinHandle, JoinSet}; use tokio_stream::wrappers::ReadDirStream; use tokio_util::sync::CancellationToken; @@ -64,8 +66,6 @@ use crate::{e2ee, EventType}; use super::{export_database, DBFILE_BACKUP_NAME}; -const MAX_CONCURRENT_DIALS: u8 = 16; - type Node = IrohNode; /// Provide or send a backup of this device. @@ -506,7 +506,7 @@ async fn run_get_request( context.emit_event(ReceiveProgress::Connected.into()); // we assume that the request includes the entire collection - let (mut next, root, collection) = { + let (mut next, _root, collection) = { let ConnectedNext::StartRoot(sc) = connected.next().await? else { bail!("request did not include collection"); }; @@ -522,24 +522,21 @@ async fn run_get_request( }; // download all the children - let mut current_blob = 0; + let mut blobs = collection.blobs().iter(); let finishing = loop { let start = match next { EndBlobNext::MoreChildren(start) => start, EndBlobNext::Closing(finishing) => break finishing, }; - let child_offset = start.child_offset(); - let offset = child_offset + 1; // get the hash of the next blob, or finish if there are no more - let Some(blob) = collection.blobs().get(current_blob) else { + let Some(blob) = blobs.next() else { break start.finish(); }; let start = start.next(blob.hash); - let done = on_blob(context, progress, jobs, &ticket, start, &blob.name).await?; + let done = on_blob(context, jobs, &ticket, start, &blob.name).await?; - current_blob += 1; next = done.next(); }; let stats = finishing.next().await?; @@ -553,10 +550,9 @@ async fn run_get_request( /// the database of the current [`Context`]. async fn on_blob( context: &Context, - progress: &ProgressEmitter, jobs: &Mutex>, ticket: &Ticket, - mut state: fsm::AtBlobHeader, + state: fsm::AtBlobHeader, name: &str, ) -> Result { ensure!(!name.is_empty(), "Received a nameless blob"); @@ -577,13 +573,12 @@ async fn on_blob( context.get_blobdir().join(blobname) }; - let file = File::create(&path).await?; - // TODO: BufWriter doesn't implement AsyncSliceWriter :( - // let mut file = BufWriter::with_capacity(128 * 1024, file); + let file_path = path.clone(); + let mut file = iroh_io::File::create(move || std::fs::File::create(&file_path)).await?; // TODO: ProgressEmitter doesn't support writers :( // let mut wrapped_file = progress.wrap_async_write(&mut file); let done = state.write_all(&mut file).await?; - file.flush().await?; + file.sync().await?; if name.starts_with("db/") { let context = context.clone();