Compare commits

...

19 Commits

Author SHA1 Message Date
dignifiedquire
438cf0d953 happy clippy 2023-07-25 21:33:04 +02:00
dignifiedquire
b8e2989819 fix deny.toml 2023-07-25 19:39:19 +02:00
dignifiedquire
b9e1e50826 remove arc 2023-07-25 16:22:25 +02:00
dignifiedquire
e3f75f9f70 fix spawn scope 2023-07-25 14:57:57 +02:00
dignifiedquire
81ada2c696 try to fix lifetime error 2023-07-25 14:57:57 +02:00
dignifiedquire
5401dd911d update to release pgp 2023-07-25 14:57:57 +02:00
dignifiedquire
a84a055fa0 reduce duplicates 2023-07-25 14:57:57 +02:00
dignifiedquire
fc3ca7fbc2 update pgp ref 2023-07-25 14:57:57 +02:00
dignifiedquire
cff7f50571 fix doc comments 2023-07-25 14:57:57 +02:00
dignifiedquire
67036adada update deny.toml 2023-07-25 14:57:57 +02:00
dignifiedquire
cbc4e043d1 happy clippy 2023-07-25 14:57:57 +02:00
dignifiedquire
61ca1b3d16 disable derp usage 2023-07-25 14:57:57 +02:00
dignifiedquire
1a8415410e update to released iroh 2023-07-25 14:57:57 +02:00
Ruediger Klaehn
7245be1b2b fix: configure iroh collection parser
we need to let the provider know that we want to support iroh collections
2023-07-25 14:57:57 +02:00
Ruediger Klaehn
8fff9ebe10 add the token to the request 2023-07-25 14:57:57 +02:00
dignifiedquire
e24af97f3e fix runtime 2023-07-25 14:57:57 +02:00
dignifiedquire
db8c3352ee get compile 2023-07-25 14:57:57 +02:00
dignifiedquire
6f35e52fd9 use git dep for iroh 2023-07-25 14:57:57 +02:00
dignifiedquire
f325961505 feat: migrate to iroh 0.5 2023-07-25 14:57:57 +02:00
5 changed files with 1443 additions and 749 deletions

1841
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -25,10 +25,6 @@ panic = 'abort'
opt-level = "z"
codegen-units = 1
[patch.crates-io]
quinn-udp = { git = "https://github.com/quinn-rs/quinn", branch="main" }
quinn-proto = { git = "https://github.com/quinn-rs/quinn", branch="main" }
[dependencies]
deltachat_derive = { path = "./deltachat_derive" }
format-flowed = { path = "./format-flowed" }
@@ -43,6 +39,7 @@ async_zip = { version = "0.0.12", default-features = false, features = ["deflate
backtrace = "0.3"
base64 = "0.21"
brotli = { version = "3.3", default-features=false, features = ["std"] }
bytes = "1"
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
@@ -53,7 +50,8 @@ futures-lite = "1.13.0"
hex = "0.4.0"
humansize = "2"
image = { version = "0.24.6", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh = { version = "0.4.1", default-features = false }
iroh = { version = "0.5.1", default-features = false, features = ["iroh-collection", "flat-db"] }
iroh-io = "0.2.1"
kamadak-exif = "0.5"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
libc = "0.2"
@@ -65,7 +63,7 @@ num-traits = "0.2"
once_cell = "1.18.0"
percent-encoding = "2.3"
parking_lot = "0.12"
pgp = { version = "0.10", default-features = false }
pgp = { version = "0.10.2", default-features = false }
pretty_env_logger = { version = "0.5", optional = true }
qrcodegen = "1.7.0"
quick-xml = "0.29"

View File

@@ -11,51 +11,31 @@ ignore = [
# Please keep this list alphabetically sorted.
skip = [
{ name = "ahash", version = "0.7.6" },
{ name = "base16ct", version = "0.1.1" },
{ name = "base64", version = "<0.21" },
{ name = "bitflags", version = "1.3.2" },
{ name = "block-buffer", version = "<0.10" },
{ name = "convert_case", version = "0.4.0" },
{ name = "curve25519-dalek", version = "3.2.0" },
{ name = "darling_core", version = "<0.14" },
{ name = "darling_macro", version = "<0.14" },
{ name = "darling", version = "<0.14" },
{ name = "der", version = "0.6.1" },
{ name = "digest", version = "<0.10" },
{ name = "ed25519-dalek", version = "1.0.1" },
{ name = "ed25519", version = "1.5.3" },
{ name = "fastrand", version = "1.9.0" },
{ name = "darling", version = "<0.14.4" },
{ name = "darling_core", version = "<0.14.4" },
{ name = "darling_macro", version = "<0.14.4" },
{ name = "fastrand", version = "<2.0.0" },
{ name = "getrandom", version = "<0.2" },
{ name = "hashbrown", version = "<0.14.0" },
{ name = "idna", version = "<0.3" },
{ name = "indexmap", version = "<2.0.0" },
{ name = "pem-rfc7468", version = "0.6.0" },
{ name = "pkcs8", version = "0.9.0" },
{ name = "quick-error", version = "<2.0" },
{ name = "rand_chacha", version = "<0.3" },
{ name = "rand_core", version = "<0.6" },
{ name = "rand", version = "<0.8" },
{ name = "redox_syscall", version = "0.2.16" },
{ name = "regex-automata", version = "<0.3.3" },
{ name = "regex-syntax", version = "0.6.29" },
{ name = "sec1", version = "0.3.0" },
{ name = "sha2", version = "<0.10" },
{ name = "signature", version = "1.6.4" },
{ name = "rustls-webpki", version = "<0.101.1" },
{ name = "socket2", version = "0.4.9" },
{ name = "sha1", version = "<0.10.5" },
{ name = "spin", version = "<0.9.6" },
{ name = "spki", version = "0.6.0" },
{ name = "syn", version = "1.0.109" },
{ name = "time", version = "<0.3" },
{ name = "wasi", version = "<0.11" },
{ name = "windows_aarch64_gnullvm", version = "<0.48" },
{ name = "windows_aarch64_msvc", version = "<0.48" },
{ name = "windows_i686_gnu", version = "<0.48" },
{ name = "windows_i686_msvc", version = "<0.48" },
{ name = "windows-sys", version = "<0.48" },
{ name = "windows-targets", version = "<0.48" },
{ name = "windows", version = "0.32.0" },
{ name = "windows_x86_64_gnullvm", version = "<0.48" },
{ name = "windows_x86_64_gnu", version = "<0.48" },
{ name = "windows_x86_64_msvc", version = "<0.48" },
{ name = "webpki-roots", version = "<0.23.1" },
{ name = "winreg", version = "0.10.1" },
]
@@ -88,5 +68,4 @@ license-files = [
github = [
"async-email",
"deltachat",
"quinn-rs",
]

View File

@@ -27,21 +27,31 @@ use std::net::Ipv4Addr;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::Poll;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver;
use bytes::Bytes;
use futures::FutureExt;
use futures_lite::StreamExt;
use iroh::blobs::Collection;
use iroh::get::DataStream;
use iroh::progress::ProgressEmitter;
use iroh::protocol::AuthToken;
use iroh::provider::{DataSource, Event, Provider, Ticket};
use iroh::Hash;
use tokio::fs::{self, File};
use tokio::io::{self, AsyncWriteExt, BufWriter};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, Mutex};
use iroh::bytes::get::{fsm, Stats};
use iroh::bytes::protocol::{AnyGetRequest, GetRequest, RequestToken};
use iroh::bytes::provider::Event as ProviderEvent;
use iroh::bytes::util::runtime;
use iroh::collection::{Collection, IrohCollectionParser};
use iroh::database::flat::DataSource;
use iroh::dial::Ticket;
use iroh::net::tls::Keypair;
use iroh::node::{Event, Node as IrohNode, StaticTokenAuthHandler};
use iroh::util::progress::ProgressEmitter;
use iroh_io::AsyncSliceWriter;
use tokio::fs;
use tokio::sync::{
broadcast::{self, error::RecvError},
Mutex,
};
use tokio::task::{JoinHandle, JoinSet};
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::sync::CancellationToken;
@@ -56,7 +66,7 @@ use crate::{e2ee, EventType};
use super::{export_database, DBFILE_BACKUP_NAME};
const MAX_CONCURRENT_DIALS: u8 = 16;
type Node = IrohNode<iroh::database::flat::Database>;
/// Provide or send a backup of this device.
///
@@ -154,9 +164,9 @@ impl BackupProvider {
/// Creates the provider task.
///
/// Having this as a function makes it easier to cancel it when needed.
async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Provider, Ticket)> {
async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Node, Ticket)> {
// Generate the token up front: we also use it to encrypt the database.
let token = AuthToken::generate();
let token = RequestToken::generate();
context.emit_event(SendProgress::Started.into());
export_database(context, dbfile, token.to_string())
.await
@@ -176,19 +186,24 @@ impl BackupProvider {
}
// Start listening.
let (db, hash) = iroh::provider::create_collection(files).await?;
let (db, hash) = iroh::database::flat::create_collection(files).await?;
context.emit_event(SendProgress::CollectionCreated.into());
let provider = Provider::builder(db)
let auth_token_handler = StaticTokenAuthHandler::new(Some(token.clone()));
let rt = runtime::Handle::from_currrent(1)?;
let provider = Node::builder(db)
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
.auth_token(token)
.spawn()?;
.custom_auth_handler(Arc::new(auth_token_handler))
.collection_parser(IrohCollectionParser)
.runtime(&rt)
.spawn()
.await?;
context.emit_event(SendProgress::ProviderListening.into());
info!(context, "Waiting for remote to connect");
let ticket = provider.ticket(hash)?;
let ticket = provider.ticket(hash).await?.with_token(Some(token));
Ok((provider, ticket))
}
/// Supervises the iroh [`Provider`], terminating it when needed.
/// Supervises the iroh [`Node`], terminating it when needed.
///
/// This will watch the provider and terminate it when:
///
@@ -200,67 +215,80 @@ impl BackupProvider {
/// we must cancel this operation.
async fn watch_provider(
context: &Context,
mut provider: Provider,
mut provider: Node,
cancel_token: Receiver<()>,
drop_token: CancellationToken,
) -> Result<()> {
let mut events = provider.subscribe();
let mut total_size = 0;
let mut current_size = 0;
let total_size = Arc::new(AtomicU64::new(0));
let current_size = Arc::new(AtomicU64::new(0));
let (transfer_done, mut transfer_done_r) = tokio::sync::mpsc::channel(1);
let ctx = context.clone();
provider
.subscribe(move |event| {
let total_size = total_size.clone();
let current_size = current_size.clone();
let transfer_done = transfer_done.clone();
let context = ctx.clone();
async move {
match event {
Event::ByteProvide(event) => match event {
ProviderEvent::ClientConnected { .. } => {
context.emit_event(SendProgress::ClientConnected.into());
}
ProviderEvent::GetRequestReceived { .. } => {}
ProviderEvent::TransferCollectionStarted {
total_blobs_size, ..
} => {
total_size
.store(total_blobs_size.unwrap_or_default(), Ordering::Relaxed);
context.emit_event(
SendProgress::TransferInProgress {
current_size: current_size.load(Ordering::Relaxed),
total_size: total_size.load(Ordering::Relaxed),
}
.into(),
);
}
ProviderEvent::TransferBlobCompleted { size, .. } => {
current_size.fetch_add(size, Ordering::Relaxed);
context.emit_event(
SendProgress::TransferInProgress {
current_size: current_size.load(Ordering::Relaxed),
total_size: total_size.load(Ordering::Relaxed),
}
.into(),
);
}
ProviderEvent::TransferCollectionCompleted { .. } => {
let total_size = total_size.load(Ordering::Relaxed);
context.emit_event(
SendProgress::TransferInProgress {
current_size: total_size,
total_size,
}
.into(),
);
transfer_done.send(()).await.ok();
}
ProviderEvent::TransferAborted { .. } => {
transfer_done.send(()).await.ok();
}
ProviderEvent::CollectionAdded { .. } => {}
ProviderEvent::CustomGetRequestReceived { .. } => {}
},
}
}
.boxed()
})
.await?;
let res = loop {
tokio::select! {
biased;
res = &mut provider => {
break res.context("BackupProvider failed");
},
maybe_event = events.recv() => {
match maybe_event {
Ok(event) => {
match event {
Event::ClientConnected { ..} => {
context.emit_event(SendProgress::ClientConnected.into());
}
Event::RequestReceived { .. } => {
}
Event::TransferCollectionStarted { total_blobs_size, .. } => {
total_size = total_blobs_size;
context.emit_event(SendProgress::TransferInProgress {
current_size,
total_size,
}.into());
}
Event::TransferBlobCompleted { size, .. } => {
current_size += size;
context.emit_event(SendProgress::TransferInProgress {
current_size,
total_size,
}.into());
}
Event::TransferCollectionCompleted { .. } => {
context.emit_event(SendProgress::TransferInProgress {
current_size: total_size,
total_size
}.into());
provider.shutdown();
}
Event::TransferAborted { .. } => {
provider.shutdown();
break Err(anyhow!("BackupProvider transfer aborted"));
}
}
}
Err(broadcast::error::RecvError::Closed) => {
// We should never see this, provider.join() should complete
// first.
}
Err(broadcast::error::RecvError::Lagged(_)) => {
// We really shouldn't be lagging, if we did we may have missed
// a completion event.
provider.shutdown();
break Err(anyhow!("Missed events from BackupProvider"));
}
}
},
}
_ = cancel_token.recv() => {
provider.shutdown();
break Err(anyhow!("BackupProvider cancelled"));
@@ -269,6 +297,10 @@ impl BackupProvider {
provider.shutdown();
break Err(anyhow!("BackupProvider dropped"));
}
_ = transfer_done_r.recv() => {
provider.shutdown();
break Ok(());
}
}
};
match &res {
@@ -382,7 +414,7 @@ impl From<SendProgress> for EventType {
/// This is a long running operation which will only when completed.
///
/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It
/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without
/// does avoid having [`iroh::dial::Ticket`] in the primary API however, without
/// having to revert to untyped bytes.
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
ensure!(
@@ -441,30 +473,11 @@ async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> {
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
spawn_progress_proxy(context.clone(), progress.subscribe());
let on_connected = || {
context.emit_event(ReceiveProgress::Connected.into());
async { Ok(()) }
};
let on_collection = |collection: &Collection| {
context.emit_event(ReceiveProgress::CollectionReceived.into());
progress.set_total(collection.total_blobs_size());
async { Ok(()) }
};
let jobs = Mutex::new(JoinSet::default());
let on_blob =
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
// Perform the transfer.
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
let stats = iroh::get::run_ticket(
ticket,
keylog,
MAX_CONCURRENT_DIALS,
on_connected,
on_collection,
on_blob,
)
.await?;
let stats = run_get_request(context, &progress, &jobs, ticket.clone()).await?;
let mut jobs = jobs.lock().await;
while let Some(job) = jobs.join_next().await {
@@ -479,19 +492,78 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
Ok(())
}
/// Run the get request
async fn run_get_request(
context: &Context,
progress: &ProgressEmitter,
jobs: &Mutex<JoinSet<()>>,
ticket: Ticket,
) -> anyhow::Result<Stats> {
// DERP usage for NAT traversal and relay are currently disabled.
let derp_map = None;
let opts = ticket.as_get_options(Keypair::generate(), derp_map);
let request =
AnyGetRequest::Get(GetRequest::all(ticket.hash())).with_token(ticket.token().cloned());
let connection = iroh::dial::dial(opts).await?;
let initial = fsm::start(connection, request);
let connected = initial.next().await?;
context.emit_event(ReceiveProgress::Connected.into());
let rt = runtime::Handle::from_currrent(1)?;
// we assume that the request includes the entire collection
let (mut next, _root, collection) = {
let fsm::ConnectedNext::StartRoot(sc) = connected.next().await? else {
bail!("request did not include collection");
};
let (done, data) = sc.next().concatenate_into_vec().await?;
let data = Bytes::from(data);
let collection = Collection::from_bytes(&data)?;
context.emit_event(ReceiveProgress::CollectionReceived.into());
progress.set_total(collection.total_blobs_size());
(done.next(), data, collection)
};
// download all the children
let mut blobs = collection.blobs().iter();
let finishing = loop {
let start = match next {
fsm::EndBlobNext::MoreChildren(start) => start,
fsm::EndBlobNext::Closing(finishing) => break finishing,
};
// get the hash of the next blob, or finish if there are no more
let Some(blob) = blobs.next() else {
break start.finish();
};
let start = start.next(blob.hash);
let done = on_blob(context, &rt, jobs, &ticket, start, &blob.name).await?;
next = done.next();
};
let stats = finishing.next().await?;
Ok(stats)
}
/// Get callback when a blob is received from the provider.
///
/// This writes the blobs to the blobdir. If the blob is the database it will import it to
/// the database of the current [`Context`].
async fn on_blob(
context: &Context,
progress: &ProgressEmitter,
rt: &runtime::Handle,
jobs: &Mutex<JoinSet<()>>,
ticket: &Ticket,
_hash: Hash,
mut reader: DataStream,
name: String,
) -> Result<DataStream> {
state: fsm::AtBlobHeader,
name: &str,
) -> Result<fsm::AtEndBlob> {
ensure!(!name.is_empty(), "Received a nameless blob");
let path = if name.starts_with("db/") {
let context_dir = context
@@ -510,15 +582,27 @@ async fn on_blob(
context.get_blobdir().join(blobname)
};
let mut wrapped_reader = progress.wrap_async_read(&mut reader);
let file = File::create(&path).await?;
let mut file = BufWriter::with_capacity(128 * 1024, file);
io::copy(&mut wrapped_reader, &mut file).await?;
file.flush().await?;
// `iroh_io` io needs to be done on a local spawn
let file_path = path.clone();
let done = rt
.local_pool()
.spawn_pinned(move || {
let file_path = file_path.clone();
Box::pin(async move {
let mut file =
iroh_io::File::create(move || std::fs::File::create(&file_path)).await?;
// TODO: ProgressEmitter doesn't support writers :(
// let mut wrapped_file = progress.wrap_async_write(&mut file);
let done = state.write_all(&mut file).await?;
file.sync().await?;
anyhow::Ok(done)
})
})
.await??;
if name.starts_with("db/") {
let context = context.clone();
let token = ticket.token().to_string();
let token = ticket.token().map(|t| t.to_string()).unwrap_or_default();
jobs.lock().await.spawn(async move {
if let Err(err) = context.sql.import(&path, token).await {
error!(context, "cannot import database: {:#?}", err);
@@ -533,7 +617,8 @@ async fn on_blob(
}
});
}
Ok(reader)
Ok(done)
}
/// Spawns a task proxying progress events.

View File

@@ -5,6 +5,7 @@ use std::collections::BTreeMap;
use anyhow::{anyhow, bail, ensure, Context as _, Result};
pub use dclogin_scheme::LoginOptions;
use iroh::dial::Ticket;
use once_cell::sync::Lazy;
use percent_encoding::percent_decode_str;
use serde::Deserialize;
@@ -113,7 +114,7 @@ pub enum Qr {
/// information to connect to and authenticate a backup provider.
///
/// The format is somewhat opaque, but `sendme` can deserialise this.
ticket: iroh::provider::Ticket,
ticket: Ticket,
},
/// Ask the user if they want to use the given service for video chats.
@@ -496,12 +497,12 @@ fn decode_webrtc_instance(_context: &Context, qr: &str) -> Result<Qr> {
/// Decodes a [`DCBACKUP_SCHEME`] QR code.
///
/// The format of this scheme is `DCBACKUP:<encoded ticket>`. The encoding is the
/// [`iroh::provider::Ticket`]'s `Display` impl.
/// [`iroh::dial::Ticket`]'s `Display` impl.
fn decode_backup(qr: &str) -> Result<Qr> {
let payload = qr
.strip_prefix(DCBACKUP_SCHEME)
.ok_or_else(|| anyhow!("invalid DCBACKUP scheme"))?;
let ticket: iroh::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
let ticket: Ticket = payload.parse().context("invalid DCBACKUP payload")?;
Ok(Qr::Backup { ticket })
}