mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
19 Commits
v1.137.3
...
feat-iroh-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
438cf0d953 | ||
|
|
b8e2989819 | ||
|
|
b9e1e50826 | ||
|
|
e3f75f9f70 | ||
|
|
81ada2c696 | ||
|
|
5401dd911d | ||
|
|
a84a055fa0 | ||
|
|
fc3ca7fbc2 | ||
|
|
cff7f50571 | ||
|
|
67036adada | ||
|
|
cbc4e043d1 | ||
|
|
61ca1b3d16 | ||
|
|
1a8415410e | ||
|
|
7245be1b2b | ||
|
|
8fff9ebe10 | ||
|
|
e24af97f3e | ||
|
|
db8c3352ee | ||
|
|
6f35e52fd9 | ||
|
|
f325961505 |
1841
Cargo.lock
generated
1841
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
10
Cargo.toml
10
Cargo.toml
@@ -25,10 +25,6 @@ panic = 'abort'
|
||||
opt-level = "z"
|
||||
codegen-units = 1
|
||||
|
||||
[patch.crates-io]
|
||||
quinn-udp = { git = "https://github.com/quinn-rs/quinn", branch="main" }
|
||||
quinn-proto = { git = "https://github.com/quinn-rs/quinn", branch="main" }
|
||||
|
||||
[dependencies]
|
||||
deltachat_derive = { path = "./deltachat_derive" }
|
||||
format-flowed = { path = "./format-flowed" }
|
||||
@@ -43,6 +39,7 @@ async_zip = { version = "0.0.12", default-features = false, features = ["deflate
|
||||
backtrace = "0.3"
|
||||
base64 = "0.21"
|
||||
brotli = { version = "3.3", default-features=false, features = ["std"] }
|
||||
bytes = "1"
|
||||
chrono = { version = "0.4", default-features=false, features = ["clock", "std"] }
|
||||
email = { git = "https://github.com/deltachat/rust-email", branch = "master" }
|
||||
encoded-words = { git = "https://github.com/async-email/encoded-words", branch = "master" }
|
||||
@@ -53,7 +50,8 @@ futures-lite = "1.13.0"
|
||||
hex = "0.4.0"
|
||||
humansize = "2"
|
||||
image = { version = "0.24.6", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
|
||||
iroh = { version = "0.4.1", default-features = false }
|
||||
iroh = { version = "0.5.1", default-features = false, features = ["iroh-collection", "flat-db"] }
|
||||
iroh-io = "0.2.1"
|
||||
kamadak-exif = "0.5"
|
||||
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
|
||||
libc = "0.2"
|
||||
@@ -65,7 +63,7 @@ num-traits = "0.2"
|
||||
once_cell = "1.18.0"
|
||||
percent-encoding = "2.3"
|
||||
parking_lot = "0.12"
|
||||
pgp = { version = "0.10", default-features = false }
|
||||
pgp = { version = "0.10.2", default-features = false }
|
||||
pretty_env_logger = { version = "0.5", optional = true }
|
||||
qrcodegen = "1.7.0"
|
||||
quick-xml = "0.29"
|
||||
|
||||
37
deny.toml
37
deny.toml
@@ -11,51 +11,31 @@ ignore = [
|
||||
# Please keep this list alphabetically sorted.
|
||||
skip = [
|
||||
{ name = "ahash", version = "0.7.6" },
|
||||
{ name = "base16ct", version = "0.1.1" },
|
||||
{ name = "base64", version = "<0.21" },
|
||||
{ name = "bitflags", version = "1.3.2" },
|
||||
{ name = "block-buffer", version = "<0.10" },
|
||||
{ name = "convert_case", version = "0.4.0" },
|
||||
{ name = "curve25519-dalek", version = "3.2.0" },
|
||||
{ name = "darling_core", version = "<0.14" },
|
||||
{ name = "darling_macro", version = "<0.14" },
|
||||
{ name = "darling", version = "<0.14" },
|
||||
{ name = "der", version = "0.6.1" },
|
||||
{ name = "digest", version = "<0.10" },
|
||||
{ name = "ed25519-dalek", version = "1.0.1" },
|
||||
{ name = "ed25519", version = "1.5.3" },
|
||||
{ name = "fastrand", version = "1.9.0" },
|
||||
{ name = "darling", version = "<0.14.4" },
|
||||
{ name = "darling_core", version = "<0.14.4" },
|
||||
{ name = "darling_macro", version = "<0.14.4" },
|
||||
{ name = "fastrand", version = "<2.0.0" },
|
||||
{ name = "getrandom", version = "<0.2" },
|
||||
{ name = "hashbrown", version = "<0.14.0" },
|
||||
{ name = "idna", version = "<0.3" },
|
||||
{ name = "indexmap", version = "<2.0.0" },
|
||||
{ name = "pem-rfc7468", version = "0.6.0" },
|
||||
{ name = "pkcs8", version = "0.9.0" },
|
||||
{ name = "quick-error", version = "<2.0" },
|
||||
{ name = "rand_chacha", version = "<0.3" },
|
||||
{ name = "rand_core", version = "<0.6" },
|
||||
{ name = "rand", version = "<0.8" },
|
||||
{ name = "redox_syscall", version = "0.2.16" },
|
||||
{ name = "regex-automata", version = "<0.3.3" },
|
||||
{ name = "regex-syntax", version = "0.6.29" },
|
||||
{ name = "sec1", version = "0.3.0" },
|
||||
{ name = "sha2", version = "<0.10" },
|
||||
{ name = "signature", version = "1.6.4" },
|
||||
{ name = "rustls-webpki", version = "<0.101.1" },
|
||||
{ name = "socket2", version = "0.4.9" },
|
||||
{ name = "sha1", version = "<0.10.5" },
|
||||
{ name = "spin", version = "<0.9.6" },
|
||||
{ name = "spki", version = "0.6.0" },
|
||||
{ name = "syn", version = "1.0.109" },
|
||||
{ name = "time", version = "<0.3" },
|
||||
{ name = "wasi", version = "<0.11" },
|
||||
{ name = "windows_aarch64_gnullvm", version = "<0.48" },
|
||||
{ name = "windows_aarch64_msvc", version = "<0.48" },
|
||||
{ name = "windows_i686_gnu", version = "<0.48" },
|
||||
{ name = "windows_i686_msvc", version = "<0.48" },
|
||||
{ name = "windows-sys", version = "<0.48" },
|
||||
{ name = "windows-targets", version = "<0.48" },
|
||||
{ name = "windows", version = "0.32.0" },
|
||||
{ name = "windows_x86_64_gnullvm", version = "<0.48" },
|
||||
{ name = "windows_x86_64_gnu", version = "<0.48" },
|
||||
{ name = "windows_x86_64_msvc", version = "<0.48" },
|
||||
{ name = "webpki-roots", version = "<0.23.1" },
|
||||
{ name = "winreg", version = "0.10.1" },
|
||||
]
|
||||
|
||||
@@ -88,5 +68,4 @@ license-files = [
|
||||
github = [
|
||||
"async-email",
|
||||
"deltachat",
|
||||
"quinn-rs",
|
||||
]
|
||||
|
||||
@@ -27,21 +27,31 @@ use std::net::Ipv4Addr;
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
use futures_lite::StreamExt;
|
||||
use iroh::blobs::Collection;
|
||||
use iroh::get::DataStream;
|
||||
use iroh::progress::ProgressEmitter;
|
||||
use iroh::protocol::AuthToken;
|
||||
use iroh::provider::{DataSource, Event, Provider, Ticket};
|
||||
use iroh::Hash;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use iroh::bytes::get::{fsm, Stats};
|
||||
use iroh::bytes::protocol::{AnyGetRequest, GetRequest, RequestToken};
|
||||
use iroh::bytes::provider::Event as ProviderEvent;
|
||||
use iroh::bytes::util::runtime;
|
||||
use iroh::collection::{Collection, IrohCollectionParser};
|
||||
use iroh::database::flat::DataSource;
|
||||
use iroh::dial::Ticket;
|
||||
use iroh::net::tls::Keypair;
|
||||
use iroh::node::{Event, Node as IrohNode, StaticTokenAuthHandler};
|
||||
use iroh::util::progress::ProgressEmitter;
|
||||
use iroh_io::AsyncSliceWriter;
|
||||
use tokio::fs;
|
||||
use tokio::sync::{
|
||||
broadcast::{self, error::RecvError},
|
||||
Mutex,
|
||||
};
|
||||
use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -56,7 +66,7 @@ use crate::{e2ee, EventType};
|
||||
|
||||
use super::{export_database, DBFILE_BACKUP_NAME};
|
||||
|
||||
const MAX_CONCURRENT_DIALS: u8 = 16;
|
||||
type Node = IrohNode<iroh::database::flat::Database>;
|
||||
|
||||
/// Provide or send a backup of this device.
|
||||
///
|
||||
@@ -154,9 +164,9 @@ impl BackupProvider {
|
||||
/// Creates the provider task.
|
||||
///
|
||||
/// Having this as a function makes it easier to cancel it when needed.
|
||||
async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Provider, Ticket)> {
|
||||
async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Node, Ticket)> {
|
||||
// Generate the token up front: we also use it to encrypt the database.
|
||||
let token = AuthToken::generate();
|
||||
let token = RequestToken::generate();
|
||||
context.emit_event(SendProgress::Started.into());
|
||||
export_database(context, dbfile, token.to_string())
|
||||
.await
|
||||
@@ -176,19 +186,24 @@ impl BackupProvider {
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
let (db, hash) = iroh::provider::create_collection(files).await?;
|
||||
let (db, hash) = iroh::database::flat::create_collection(files).await?;
|
||||
context.emit_event(SendProgress::CollectionCreated.into());
|
||||
let provider = Provider::builder(db)
|
||||
let auth_token_handler = StaticTokenAuthHandler::new(Some(token.clone()));
|
||||
let rt = runtime::Handle::from_currrent(1)?;
|
||||
let provider = Node::builder(db)
|
||||
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
|
||||
.auth_token(token)
|
||||
.spawn()?;
|
||||
.custom_auth_handler(Arc::new(auth_token_handler))
|
||||
.collection_parser(IrohCollectionParser)
|
||||
.runtime(&rt)
|
||||
.spawn()
|
||||
.await?;
|
||||
context.emit_event(SendProgress::ProviderListening.into());
|
||||
info!(context, "Waiting for remote to connect");
|
||||
let ticket = provider.ticket(hash)?;
|
||||
let ticket = provider.ticket(hash).await?.with_token(Some(token));
|
||||
Ok((provider, ticket))
|
||||
}
|
||||
|
||||
/// Supervises the iroh [`Provider`], terminating it when needed.
|
||||
/// Supervises the iroh [`Node`], terminating it when needed.
|
||||
///
|
||||
/// This will watch the provider and terminate it when:
|
||||
///
|
||||
@@ -200,67 +215,80 @@ impl BackupProvider {
|
||||
/// we must cancel this operation.
|
||||
async fn watch_provider(
|
||||
context: &Context,
|
||||
mut provider: Provider,
|
||||
mut provider: Node,
|
||||
cancel_token: Receiver<()>,
|
||||
drop_token: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let mut events = provider.subscribe();
|
||||
let mut total_size = 0;
|
||||
let mut current_size = 0;
|
||||
let total_size = Arc::new(AtomicU64::new(0));
|
||||
let current_size = Arc::new(AtomicU64::new(0));
|
||||
let (transfer_done, mut transfer_done_r) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
let ctx = context.clone();
|
||||
provider
|
||||
.subscribe(move |event| {
|
||||
let total_size = total_size.clone();
|
||||
let current_size = current_size.clone();
|
||||
let transfer_done = transfer_done.clone();
|
||||
let context = ctx.clone();
|
||||
async move {
|
||||
match event {
|
||||
Event::ByteProvide(event) => match event {
|
||||
ProviderEvent::ClientConnected { .. } => {
|
||||
context.emit_event(SendProgress::ClientConnected.into());
|
||||
}
|
||||
ProviderEvent::GetRequestReceived { .. } => {}
|
||||
ProviderEvent::TransferCollectionStarted {
|
||||
total_blobs_size, ..
|
||||
} => {
|
||||
total_size
|
||||
.store(total_blobs_size.unwrap_or_default(), Ordering::Relaxed);
|
||||
context.emit_event(
|
||||
SendProgress::TransferInProgress {
|
||||
current_size: current_size.load(Ordering::Relaxed),
|
||||
total_size: total_size.load(Ordering::Relaxed),
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
ProviderEvent::TransferBlobCompleted { size, .. } => {
|
||||
current_size.fetch_add(size, Ordering::Relaxed);
|
||||
context.emit_event(
|
||||
SendProgress::TransferInProgress {
|
||||
current_size: current_size.load(Ordering::Relaxed),
|
||||
total_size: total_size.load(Ordering::Relaxed),
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
ProviderEvent::TransferCollectionCompleted { .. } => {
|
||||
let total_size = total_size.load(Ordering::Relaxed);
|
||||
context.emit_event(
|
||||
SendProgress::TransferInProgress {
|
||||
current_size: total_size,
|
||||
total_size,
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
transfer_done.send(()).await.ok();
|
||||
}
|
||||
ProviderEvent::TransferAborted { .. } => {
|
||||
transfer_done.send(()).await.ok();
|
||||
}
|
||||
ProviderEvent::CollectionAdded { .. } => {}
|
||||
ProviderEvent::CustomGetRequestReceived { .. } => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let res = loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
res = &mut provider => {
|
||||
break res.context("BackupProvider failed");
|
||||
},
|
||||
maybe_event = events.recv() => {
|
||||
match maybe_event {
|
||||
Ok(event) => {
|
||||
match event {
|
||||
Event::ClientConnected { ..} => {
|
||||
context.emit_event(SendProgress::ClientConnected.into());
|
||||
}
|
||||
Event::RequestReceived { .. } => {
|
||||
}
|
||||
Event::TransferCollectionStarted { total_blobs_size, .. } => {
|
||||
total_size = total_blobs_size;
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
}.into());
|
||||
}
|
||||
Event::TransferBlobCompleted { size, .. } => {
|
||||
current_size += size;
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
}.into());
|
||||
}
|
||||
Event::TransferCollectionCompleted { .. } => {
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size: total_size,
|
||||
total_size
|
||||
}.into());
|
||||
provider.shutdown();
|
||||
}
|
||||
Event::TransferAborted { .. } => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider transfer aborted"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
// We should never see this, provider.join() should complete
|
||||
// first.
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
// We really shouldn't be lagging, if we did we may have missed
|
||||
// a completion event.
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("Missed events from BackupProvider"));
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
_ = cancel_token.recv() => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider cancelled"));
|
||||
@@ -269,6 +297,10 @@ impl BackupProvider {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider dropped"));
|
||||
}
|
||||
_ = transfer_done_r.recv() => {
|
||||
provider.shutdown();
|
||||
break Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
match &res {
|
||||
@@ -382,7 +414,7 @@ impl From<SendProgress> for EventType {
|
||||
/// This is a long running operation which will only when completed.
|
||||
///
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It
|
||||
/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without
|
||||
/// does avoid having [`iroh::dial::Ticket`] in the primary API however, without
|
||||
/// having to revert to untyped bytes.
|
||||
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
ensure!(
|
||||
@@ -441,30 +473,11 @@ async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
|
||||
async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()> {
|
||||
let progress = ProgressEmitter::new(0, ReceiveProgress::max_blob_progress());
|
||||
spawn_progress_proxy(context.clone(), progress.subscribe());
|
||||
let on_connected = || {
|
||||
context.emit_event(ReceiveProgress::Connected.into());
|
||||
async { Ok(()) }
|
||||
};
|
||||
let on_collection = |collection: &Collection| {
|
||||
context.emit_event(ReceiveProgress::CollectionReceived.into());
|
||||
progress.set_total(collection.total_blobs_size());
|
||||
async { Ok(()) }
|
||||
};
|
||||
|
||||
let jobs = Mutex::new(JoinSet::default());
|
||||
let on_blob =
|
||||
|hash, reader, name| on_blob(context, &progress, &jobs, ticket, hash, reader, name);
|
||||
|
||||
// Perform the transfer.
|
||||
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
|
||||
let stats = iroh::get::run_ticket(
|
||||
ticket,
|
||||
keylog,
|
||||
MAX_CONCURRENT_DIALS,
|
||||
on_connected,
|
||||
on_collection,
|
||||
on_blob,
|
||||
)
|
||||
.await?;
|
||||
let stats = run_get_request(context, &progress, &jobs, ticket.clone()).await?;
|
||||
|
||||
let mut jobs = jobs.lock().await;
|
||||
while let Some(job) = jobs.join_next().await {
|
||||
@@ -479,19 +492,78 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run the get request
|
||||
async fn run_get_request(
|
||||
context: &Context,
|
||||
progress: &ProgressEmitter,
|
||||
jobs: &Mutex<JoinSet<()>>,
|
||||
ticket: Ticket,
|
||||
) -> anyhow::Result<Stats> {
|
||||
// DERP usage for NAT traversal and relay are currently disabled.
|
||||
let derp_map = None;
|
||||
|
||||
let opts = ticket.as_get_options(Keypair::generate(), derp_map);
|
||||
let request =
|
||||
AnyGetRequest::Get(GetRequest::all(ticket.hash())).with_token(ticket.token().cloned());
|
||||
let connection = iroh::dial::dial(opts).await?;
|
||||
let initial = fsm::start(connection, request);
|
||||
|
||||
let connected = initial.next().await?;
|
||||
context.emit_event(ReceiveProgress::Connected.into());
|
||||
|
||||
let rt = runtime::Handle::from_currrent(1)?;
|
||||
|
||||
// we assume that the request includes the entire collection
|
||||
let (mut next, _root, collection) = {
|
||||
let fsm::ConnectedNext::StartRoot(sc) = connected.next().await? else {
|
||||
bail!("request did not include collection");
|
||||
};
|
||||
|
||||
let (done, data) = sc.next().concatenate_into_vec().await?;
|
||||
let data = Bytes::from(data);
|
||||
let collection = Collection::from_bytes(&data)?;
|
||||
|
||||
context.emit_event(ReceiveProgress::CollectionReceived.into());
|
||||
progress.set_total(collection.total_blobs_size());
|
||||
|
||||
(done.next(), data, collection)
|
||||
};
|
||||
|
||||
// download all the children
|
||||
let mut blobs = collection.blobs().iter();
|
||||
let finishing = loop {
|
||||
let start = match next {
|
||||
fsm::EndBlobNext::MoreChildren(start) => start,
|
||||
fsm::EndBlobNext::Closing(finishing) => break finishing,
|
||||
};
|
||||
|
||||
// get the hash of the next blob, or finish if there are no more
|
||||
let Some(blob) = blobs.next() else {
|
||||
break start.finish();
|
||||
};
|
||||
|
||||
let start = start.next(blob.hash);
|
||||
let done = on_blob(context, &rt, jobs, &ticket, start, &blob.name).await?;
|
||||
|
||||
next = done.next();
|
||||
};
|
||||
let stats = finishing.next().await?;
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Get callback when a blob is received from the provider.
|
||||
///
|
||||
/// This writes the blobs to the blobdir. If the blob is the database it will import it to
|
||||
/// the database of the current [`Context`].
|
||||
async fn on_blob(
|
||||
context: &Context,
|
||||
progress: &ProgressEmitter,
|
||||
rt: &runtime::Handle,
|
||||
jobs: &Mutex<JoinSet<()>>,
|
||||
ticket: &Ticket,
|
||||
_hash: Hash,
|
||||
mut reader: DataStream,
|
||||
name: String,
|
||||
) -> Result<DataStream> {
|
||||
state: fsm::AtBlobHeader,
|
||||
name: &str,
|
||||
) -> Result<fsm::AtEndBlob> {
|
||||
ensure!(!name.is_empty(), "Received a nameless blob");
|
||||
let path = if name.starts_with("db/") {
|
||||
let context_dir = context
|
||||
@@ -510,15 +582,27 @@ async fn on_blob(
|
||||
context.get_blobdir().join(blobname)
|
||||
};
|
||||
|
||||
let mut wrapped_reader = progress.wrap_async_read(&mut reader);
|
||||
let file = File::create(&path).await?;
|
||||
let mut file = BufWriter::with_capacity(128 * 1024, file);
|
||||
io::copy(&mut wrapped_reader, &mut file).await?;
|
||||
file.flush().await?;
|
||||
// `iroh_io` io needs to be done on a local spawn
|
||||
let file_path = path.clone();
|
||||
let done = rt
|
||||
.local_pool()
|
||||
.spawn_pinned(move || {
|
||||
let file_path = file_path.clone();
|
||||
Box::pin(async move {
|
||||
let mut file =
|
||||
iroh_io::File::create(move || std::fs::File::create(&file_path)).await?;
|
||||
// TODO: ProgressEmitter doesn't support writers :(
|
||||
// let mut wrapped_file = progress.wrap_async_write(&mut file);
|
||||
let done = state.write_all(&mut file).await?;
|
||||
file.sync().await?;
|
||||
anyhow::Ok(done)
|
||||
})
|
||||
})
|
||||
.await??;
|
||||
|
||||
if name.starts_with("db/") {
|
||||
let context = context.clone();
|
||||
let token = ticket.token().to_string();
|
||||
let token = ticket.token().map(|t| t.to_string()).unwrap_or_default();
|
||||
jobs.lock().await.spawn(async move {
|
||||
if let Err(err) = context.sql.import(&path, token).await {
|
||||
error!(context, "cannot import database: {:#?}", err);
|
||||
@@ -533,7 +617,8 @@ async fn on_blob(
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(reader)
|
||||
|
||||
Ok(done)
|
||||
}
|
||||
|
||||
/// Spawns a task proxying progress events.
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::collections::BTreeMap;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context as _, Result};
|
||||
pub use dclogin_scheme::LoginOptions;
|
||||
use iroh::dial::Ticket;
|
||||
use once_cell::sync::Lazy;
|
||||
use percent_encoding::percent_decode_str;
|
||||
use serde::Deserialize;
|
||||
@@ -113,7 +114,7 @@ pub enum Qr {
|
||||
/// information to connect to and authenticate a backup provider.
|
||||
///
|
||||
/// The format is somewhat opaque, but `sendme` can deserialise this.
|
||||
ticket: iroh::provider::Ticket,
|
||||
ticket: Ticket,
|
||||
},
|
||||
|
||||
/// Ask the user if they want to use the given service for video chats.
|
||||
@@ -496,12 +497,12 @@ fn decode_webrtc_instance(_context: &Context, qr: &str) -> Result<Qr> {
|
||||
/// Decodes a [`DCBACKUP_SCHEME`] QR code.
|
||||
///
|
||||
/// The format of this scheme is `DCBACKUP:<encoded ticket>`. The encoding is the
|
||||
/// [`iroh::provider::Ticket`]'s `Display` impl.
|
||||
/// [`iroh::dial::Ticket`]'s `Display` impl.
|
||||
fn decode_backup(qr: &str) -> Result<Qr> {
|
||||
let payload = qr
|
||||
.strip_prefix(DCBACKUP_SCHEME)
|
||||
.ok_or_else(|| anyhow!("invalid DCBACKUP scheme"))?;
|
||||
let ticket: iroh::provider::Ticket = payload.parse().context("invalid DCBACKUP payload")?;
|
||||
let ticket: Ticket = payload.parse().context("invalid DCBACKUP payload")?;
|
||||
Ok(Qr::Backup { ticket })
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user