mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
feat: new BACKUP2 transfer protocol
New protocol streams .tar into iroh-net stream without traversing all the files first. Reception over old backup protocol is still supported to allow transferring backups from old devices to new ones, but not vice versa.
This commit is contained in:
@@ -2504,6 +2504,7 @@ void dc_stop_ongoing_process (dc_context_t* context);
|
||||
#define DC_QR_FPR_WITHOUT_ADDR 230 // test1=formatted fingerprint
|
||||
#define DC_QR_ACCOUNT 250 // text1=domain
|
||||
#define DC_QR_BACKUP 251
|
||||
#define DC_QR_BACKUP2 252
|
||||
#define DC_QR_WEBRTC_INSTANCE 260 // text1=domain, text2=instance pattern
|
||||
#define DC_QR_ADDR 320 // id=contact
|
||||
#define DC_QR_TEXT 330 // text1=text
|
||||
@@ -2550,6 +2551,7 @@ void dc_stop_ongoing_process (dc_context_t* context);
|
||||
* if so, call dc_set_config_from_qr() and then dc_configure().
|
||||
*
|
||||
* - DC_QR_BACKUP:
|
||||
* - DC_QR_BACKUP2:
|
||||
* ask the user if they want to set up a new device.
|
||||
* If so, pass the qr-code to dc_receive_backup().
|
||||
*
|
||||
|
||||
@@ -4364,7 +4364,7 @@ pub unsafe extern "C" fn dc_backup_provider_wait(provider: *mut dc_backup_provid
|
||||
let ctx = &*ffi_provider.context;
|
||||
let provider = &mut ffi_provider.provider;
|
||||
block_on(provider)
|
||||
.context("Failed to await BackupProvider")
|
||||
.context("Failed to await backup provider")
|
||||
.log_err(ctx)
|
||||
.set_last_error(ctx)
|
||||
.ok();
|
||||
|
||||
@@ -50,6 +50,7 @@ impl Lot {
|
||||
Qr::FprWithoutAddr { fingerprint, .. } => Some(fingerprint),
|
||||
Qr::Account { domain } => Some(domain),
|
||||
Qr::Backup { .. } => None,
|
||||
Qr::Backup2 { .. } => None,
|
||||
Qr::WebrtcInstance { domain, .. } => Some(domain),
|
||||
Qr::Addr { draft, .. } => draft.as_deref(),
|
||||
Qr::Url { url } => Some(url),
|
||||
@@ -102,6 +103,7 @@ impl Lot {
|
||||
Qr::FprWithoutAddr { .. } => LotState::QrFprWithoutAddr,
|
||||
Qr::Account { .. } => LotState::QrAccount,
|
||||
Qr::Backup { .. } => LotState::QrBackup,
|
||||
Qr::Backup2 { .. } => LotState::QrBackup2,
|
||||
Qr::WebrtcInstance { .. } => LotState::QrWebrtcInstance,
|
||||
Qr::Addr { .. } => LotState::QrAddr,
|
||||
Qr::Url { .. } => LotState::QrUrl,
|
||||
@@ -127,6 +129,7 @@ impl Lot {
|
||||
Qr::FprWithoutAddr { .. } => Default::default(),
|
||||
Qr::Account { .. } => Default::default(),
|
||||
Qr::Backup { .. } => Default::default(),
|
||||
Qr::Backup2 { .. } => Default::default(),
|
||||
Qr::WebrtcInstance { .. } => Default::default(),
|
||||
Qr::Addr { contact_id, .. } => contact_id.to_u32(),
|
||||
Qr::Url { .. } => Default::default(),
|
||||
@@ -177,6 +180,8 @@ pub enum LotState {
|
||||
|
||||
QrBackup = 251,
|
||||
|
||||
QrBackup2 = 252,
|
||||
|
||||
/// text1=domain, text2=instance pattern
|
||||
QrWebrtcInstance = 260,
|
||||
|
||||
|
||||
@@ -35,6 +35,11 @@ pub enum QrObject {
|
||||
Backup {
|
||||
ticket: String,
|
||||
},
|
||||
Backup2 {
|
||||
auth_token: String,
|
||||
|
||||
node_addr: String,
|
||||
},
|
||||
WebrtcInstance {
|
||||
domain: String,
|
||||
instance_pattern: String,
|
||||
@@ -132,6 +137,14 @@ impl From<Qr> for QrObject {
|
||||
Qr::Backup { ticket } => QrObject::Backup {
|
||||
ticket: ticket.to_string(),
|
||||
},
|
||||
Qr::Backup2 {
|
||||
ref node_addr,
|
||||
auth_token,
|
||||
} => QrObject::Backup2 {
|
||||
node_addr: serde_json::to_string(node_addr).unwrap_or_default(),
|
||||
|
||||
auth_token,
|
||||
},
|
||||
Qr::WebrtcInstance {
|
||||
domain,
|
||||
instance_pattern,
|
||||
|
||||
@@ -128,6 +128,7 @@ module.exports = {
|
||||
DC_QR_ASK_VERIFYCONTACT: 200,
|
||||
DC_QR_ASK_VERIFYGROUP: 202,
|
||||
DC_QR_BACKUP: 251,
|
||||
DC_QR_BACKUP2: 252,
|
||||
DC_QR_ERROR: 400,
|
||||
DC_QR_FPR_MISMATCH: 220,
|
||||
DC_QR_FPR_OK: 210,
|
||||
|
||||
@@ -128,6 +128,7 @@ export enum C {
|
||||
DC_QR_ASK_VERIFYCONTACT = 200,
|
||||
DC_QR_ASK_VERIFYGROUP = 202,
|
||||
DC_QR_BACKUP = 251,
|
||||
DC_QR_BACKUP2 = 252,
|
||||
DC_QR_ERROR = 400,
|
||||
DC_QR_FPR_MISMATCH = 220,
|
||||
DC_QR_FPR_OK = 210,
|
||||
|
||||
@@ -1562,8 +1562,6 @@ def test_import_export_online_all(acfactory, tmp_path, data, lp):
|
||||
|
||||
# check progress events for import
|
||||
assert imex_tracker.wait_progress(1, progress_upper_limit=249)
|
||||
assert imex_tracker.wait_progress(500, progress_upper_limit=749)
|
||||
assert imex_tracker.wait_progress(750, progress_upper_limit=999)
|
||||
assert imex_tracker.wait_progress(1000)
|
||||
|
||||
assert_account_is_proper(ac1)
|
||||
|
||||
84
src/imex.rs
84
src/imex.rs
@@ -6,7 +6,7 @@ use std::path::{Path, PathBuf};
|
||||
use ::pgp::types::KeyTrait;
|
||||
use anyhow::{bail, ensure, format_err, Context as _, Result};
|
||||
use deltachat_contact_tools::EmailAddress;
|
||||
use futures::StreamExt;
|
||||
use futures::TryStreamExt;
|
||||
use futures_lite::FutureExt;
|
||||
|
||||
use tokio::fs::{self, File};
|
||||
@@ -269,24 +269,56 @@ async fn import_backup(
|
||||
context.get_dbfile().display()
|
||||
);
|
||||
|
||||
import_backup_stream(context, backup_file, file_size, passphrase).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Imports backup by reading a tar file from a stream.
|
||||
///
|
||||
/// `file_size` is used to calculate the progress
|
||||
/// and emit progress events.
|
||||
/// Ideally it is the sum of the entry
|
||||
/// sizes without the header overhead,
|
||||
/// but can be estimated as tar file size
|
||||
/// in which case the progress is underestimated
|
||||
/// and may not reach 99.9% by the end of import.
|
||||
/// Underestimating is better than
|
||||
/// overestimating because the progress
|
||||
/// jumps to 100% instead of getting stuck at 99.9%
|
||||
/// for some time.
|
||||
pub(crate) async fn import_backup_stream<R: tokio::io::AsyncRead + Unpin>(
|
||||
context: &Context,
|
||||
backup_file: R,
|
||||
file_size: u64,
|
||||
passphrase: String,
|
||||
) -> Result<()> {
|
||||
let mut archive = Archive::new(backup_file);
|
||||
|
||||
let mut entries = archive.entries()?;
|
||||
let mut last_progress = 0;
|
||||
while let Some(file) = entries.next().await {
|
||||
let f = &mut file?;
|
||||
|
||||
let current_pos = f.raw_file_position();
|
||||
let progress = 1000 * current_pos / file_size;
|
||||
if progress != last_progress && progress > 10 && progress < 1000 {
|
||||
// We already emitted ImexProgress(10) above
|
||||
// We already emitted ImexProgress(10) above
|
||||
let mut last_progress = 10;
|
||||
let mut total_size = 0;
|
||||
while let Some(mut f) = entries
|
||||
.try_next()
|
||||
.await
|
||||
.context("Failed to get next entry")?
|
||||
{
|
||||
total_size += f.header().entry_size()?;
|
||||
let progress = std::cmp::min(
|
||||
1000 * total_size.checked_div(file_size).unwrap_or_default(),
|
||||
999,
|
||||
);
|
||||
if progress > last_progress {
|
||||
context.emit_event(EventType::ImexProgress(progress as usize));
|
||||
last_progress = progress;
|
||||
}
|
||||
|
||||
if f.path()?.file_name() == Some(OsStr::new(DBFILE_BACKUP_NAME)) {
|
||||
// async_tar can't unpack to a specified file name, so we just unpack to the blobdir and then move the unpacked file.
|
||||
f.unpack_in(context.get_blobdir()).await?;
|
||||
f.unpack_in(context.get_blobdir())
|
||||
.await
|
||||
.context("Failed to unpack database")?;
|
||||
let unpacked_database = context.get_blobdir().join(DBFILE_BACKUP_NAME);
|
||||
context
|
||||
.sql
|
||||
@@ -298,7 +330,9 @@ async fn import_backup(
|
||||
.context("cannot remove unpacked database")?;
|
||||
} else {
|
||||
// async_tar will unpack to blobdir/BLOBS_BACKUP_NAME, so we move the file afterwards.
|
||||
f.unpack_in(context.get_blobdir()).await?;
|
||||
f.unpack_in(context.get_blobdir())
|
||||
.await
|
||||
.context("Failed to unpack blob")?;
|
||||
let from_path = context.get_blobdir().join(f.path()?);
|
||||
if from_path.is_file() {
|
||||
if let Some(name) = from_path.file_name() {
|
||||
@@ -375,34 +409,40 @@ async fn export_backup(context: &Context, dir: &Path, passphrase: String) -> Res
|
||||
dest_path.display(),
|
||||
);
|
||||
|
||||
export_backup_inner(context, &temp_db_path, &temp_path).await?;
|
||||
let file = File::create(&temp_path).await?;
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
export_backup_stream(context, &temp_db_path, blobdir, file)
|
||||
.await
|
||||
.context("Exporting backup to file failed")?;
|
||||
fs::rename(temp_path, &dest_path).await?;
|
||||
context.emit_event(EventType::ImexFileWritten(dest_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn export_backup_inner(
|
||||
context: &Context,
|
||||
/// Exports the database and blobs into a stream.
|
||||
pub(crate) async fn export_backup_stream<'a, W>(
|
||||
context: &'a Context,
|
||||
temp_db_path: &Path,
|
||||
temp_path: &Path,
|
||||
) -> Result<()> {
|
||||
let file = File::create(temp_path).await?;
|
||||
|
||||
let mut builder = tokio_tar::Builder::new(file);
|
||||
blobdir: BlobDirContents<'a>,
|
||||
writer: W,
|
||||
) -> Result<()>
|
||||
where
|
||||
W: tokio::io::AsyncWrite + tokio::io::AsyncWriteExt + Unpin + Send + 'static,
|
||||
{
|
||||
let mut builder = tokio_tar::Builder::new(writer);
|
||||
|
||||
builder
|
||||
.append_path_with_name(temp_db_path, DBFILE_BACKUP_NAME)
|
||||
.await?;
|
||||
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
let mut last_progress = 0;
|
||||
let mut last_progress = 10;
|
||||
|
||||
for (i, blob) in blobdir.iter().enumerate() {
|
||||
let mut file = File::open(blob.to_abs_path()).await?;
|
||||
let path_in_archive = PathBuf::from(BLOBS_BACKUP_NAME).join(blob.as_name());
|
||||
builder.append_file(path_in_archive, &mut file).await?;
|
||||
let progress = 1000 * i / blobdir.len();
|
||||
if progress != last_progress && progress > 10 && progress < 1000 {
|
||||
let progress = std::cmp::min(1000 * i / blobdir.len(), 999);
|
||||
if progress > last_progress {
|
||||
context.emit_event(EventType::ImexProgress(progress));
|
||||
last_progress = progress;
|
||||
}
|
||||
|
||||
@@ -1,17 +1,12 @@
|
||||
//! Transfer a backup to an other device.
|
||||
//!
|
||||
//! This module provides support for using n0's iroh tool to initiate transfer of a backup
|
||||
//! to another device using a QR code.
|
||||
//!
|
||||
//! Using the iroh terminology there are two parties to this:
|
||||
//! This module provides support for using [iroh](https://iroh.computer/)
|
||||
//! to initiate transfer of a backup to another device using a QR code.
|
||||
//!
|
||||
//! There are two parties to this:
|
||||
//! - The *Provider*, which starts a server and listens for connections.
|
||||
//! - The *Getter*, which connects to the server and retrieves the data.
|
||||
//!
|
||||
//! Iroh is designed around the idea of verifying hashes, the downloads are verified as
|
||||
//! they are retrieved. The entire transfer is initiated by requesting the data of a single
|
||||
//! root hash.
|
||||
//!
|
||||
//! Both the provider and the getter are authenticated:
|
||||
//!
|
||||
//! - The provider is known by its *peer ID*.
|
||||
@@ -21,23 +16,30 @@
|
||||
//! Both these are transferred in the QR code offered to the getter. This ensures that the
|
||||
//! getter can not connect to an impersonated provider and the provider does not offer the
|
||||
//! download to an impersonated getter.
|
||||
//!
|
||||
//! Protocol starts by getter opening a bidirectional QUIC stream
|
||||
//! to the provider and sending authentication token.
|
||||
//! Provider verifies received authentication token,
|
||||
//! sends the size of all files in a backup (database and all blobs)
|
||||
//! as an unsigned 64-bit big endian integer and streams the backup in tar format.
|
||||
//! Getter receives the backup and acknowledges successful reception
|
||||
//! by sending a single byte.
|
||||
//! Provider closes the endpoint after receiving an acknowledgment.
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
|
||||
use async_channel::Receiver;
|
||||
use futures_lite::StreamExt;
|
||||
use iroh::blobs::Collection;
|
||||
use iroh::get::DataStream;
|
||||
use iroh::progress::ProgressEmitter;
|
||||
use iroh::protocol::AuthToken;
|
||||
use iroh::provider::{DataSource, Event, Provider, Ticket};
|
||||
use iroh::Hash;
|
||||
use iroh_old as iroh;
|
||||
use iroh_net::relay::RelayMode;
|
||||
use iroh_net::Endpoint;
|
||||
use iroh_old;
|
||||
use iroh_old::blobs::Collection;
|
||||
use iroh_old::get::DataStream;
|
||||
use iroh_old::progress::ProgressEmitter;
|
||||
use iroh_old::provider::Ticket;
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::{self, AsyncWriteExt, BufWriter};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
@@ -46,19 +48,22 @@ use tokio::task::{JoinHandle, JoinSet};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::blob::BlobDirContents;
|
||||
use crate::chat::{add_device_msg, delete_and_reset_all_device_msgs};
|
||||
use crate::context::Context;
|
||||
use crate::imex::BlobDirContents;
|
||||
use crate::message::{Message, Viewtype};
|
||||
use crate::qr::{self, Qr};
|
||||
use crate::stock_str::backup_transfer_msg_body;
|
||||
use crate::tools::{time, TempPathGuard};
|
||||
use crate::{e2ee, EventType};
|
||||
use crate::tools::{create_id, time, TempPathGuard};
|
||||
use crate::EventType;
|
||||
|
||||
use super::{export_database, DBFILE_BACKUP_NAME};
|
||||
use super::{export_backup_stream, export_database, import_backup_stream, DBFILE_BACKUP_NAME};
|
||||
|
||||
const MAX_CONCURRENT_DIALS: u8 = 16;
|
||||
|
||||
/// ALPN protocol identifier for the backup transfer protocol.
|
||||
const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
|
||||
|
||||
/// Provide or send a backup of this device.
|
||||
///
|
||||
/// This creates a backup of the current device and starts a service which offers another
|
||||
@@ -69,15 +74,21 @@ const MAX_CONCURRENT_DIALS: u8 = 16;
|
||||
///
|
||||
/// This starts a task which acquires the global "ongoing" mutex. If you need to stop the
|
||||
/// task use the [`Context::stop_ongoing`] mechanism.
|
||||
///
|
||||
/// The task implements [`Future`] and awaiting it will complete once a transfer has been
|
||||
/// either completed or aborted.
|
||||
#[derive(Debug)]
|
||||
pub struct BackupProvider {
|
||||
/// The supervisor task, run by [`BackupProvider::watch_provider`].
|
||||
/// iroh-net endpoint.
|
||||
_endpoint: Endpoint,
|
||||
|
||||
/// iroh-net address.
|
||||
node_addr: iroh_net::NodeAddr,
|
||||
|
||||
/// Authentication token that should be submitted
|
||||
/// to retrieve the backup.
|
||||
auth_token: String,
|
||||
|
||||
/// Handle for the task accepting backup transfer requests.
|
||||
handle: JoinHandle<Result<()>>,
|
||||
/// The ticket to retrieve the backup collection.
|
||||
ticket: Ticket,
|
||||
|
||||
/// Guard to cancel the provider on drop.
|
||||
_drop_guard: tokio_util::sync::DropGuard,
|
||||
}
|
||||
@@ -94,9 +105,13 @@ impl BackupProvider {
|
||||
///
|
||||
/// [`Accounts::stop_io`]: crate::accounts::Accounts::stop_io
|
||||
pub async fn prepare(context: &Context) -> Result<Self> {
|
||||
e2ee::ensure_secret_key_exists(context)
|
||||
.await
|
||||
.context("Private key not available, aborting backup export")?;
|
||||
let relay_mode = RelayMode::Disabled;
|
||||
let endpoint = Endpoint::builder()
|
||||
.alpns(vec![BACKUP_ALPN.to_vec()])
|
||||
.relay_mode(relay_mode)
|
||||
.bind(0)
|
||||
.await?;
|
||||
let node_addr = endpoint.node_addr().await?;
|
||||
|
||||
// Acquire global "ongoing" mutex.
|
||||
let cancel_token = context.alloc_ongoing().await?;
|
||||
@@ -104,195 +119,153 @@ impl BackupProvider {
|
||||
let context_dir = context
|
||||
.get_blobdir()
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow!("Context dir not found"))?;
|
||||
.context("Context dir not found")?;
|
||||
let dbfile = context_dir.join(DBFILE_BACKUP_NAME);
|
||||
if fs::metadata(&dbfile).await.is_ok() {
|
||||
fs::remove_file(&dbfile).await?;
|
||||
warn!(context, "Previous database export deleted");
|
||||
}
|
||||
let dbfile = TempPathGuard::new(dbfile);
|
||||
let res = tokio::select! {
|
||||
biased;
|
||||
res = Self::prepare_inner(context, &dbfile) => {
|
||||
match res {
|
||||
Ok(slf) => Ok(slf),
|
||||
Err(err) => {
|
||||
error!(context, "Failed to set up second device setup: {:#}", err);
|
||||
Err(err)
|
||||
},
|
||||
}
|
||||
},
|
||||
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
||||
};
|
||||
let (provider, ticket) = match res {
|
||||
Ok((provider, ticket)) => (provider, ticket),
|
||||
Err(err) => {
|
||||
context.free_ongoing().await;
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
// Authentication token that receiver should send us to receive a backup.
|
||||
let auth_token = create_id();
|
||||
|
||||
let passphrase = String::new();
|
||||
|
||||
export_database(context, &dbfile, passphrase, time())
|
||||
.await
|
||||
.context("Database export failed")?;
|
||||
context.emit_event(EventType::ImexProgress(300));
|
||||
|
||||
let drop_token = CancellationToken::new();
|
||||
let handle = {
|
||||
let context = context.clone();
|
||||
let drop_token = drop_token.clone();
|
||||
let endpoint = endpoint.clone();
|
||||
let auth_token = auth_token.clone();
|
||||
tokio::spawn(async move {
|
||||
let res = Self::watch_provider(&context, provider, cancel_token, drop_token).await;
|
||||
Self::accept_loop(
|
||||
context.clone(),
|
||||
endpoint,
|
||||
auth_token,
|
||||
cancel_token,
|
||||
drop_token,
|
||||
dbfile,
|
||||
)
|
||||
.await;
|
||||
info!(context, "Finished accept loop.");
|
||||
|
||||
context.free_ongoing().await;
|
||||
|
||||
// Explicit drop to move the guards into this future
|
||||
drop(paused_guard);
|
||||
drop(dbfile);
|
||||
res
|
||||
Ok(())
|
||||
})
|
||||
};
|
||||
Ok(Self {
|
||||
_endpoint: endpoint,
|
||||
node_addr,
|
||||
auth_token,
|
||||
handle,
|
||||
ticket,
|
||||
_drop_guard: drop_token.drop_guard(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates the provider task.
|
||||
///
|
||||
/// Having this as a function makes it easier to cancel it when needed.
|
||||
async fn prepare_inner(context: &Context, dbfile: &Path) -> Result<(Provider, Ticket)> {
|
||||
// Generate the token up front: we also use it to encrypt the database.
|
||||
let token = AuthToken::generate();
|
||||
context.emit_event(SendProgress::Started.into());
|
||||
export_database(context, dbfile, token.to_string(), time())
|
||||
.await
|
||||
.context("Database export failed")?;
|
||||
context.emit_event(SendProgress::DatabaseExported.into());
|
||||
async fn handle_connection(
|
||||
context: Context,
|
||||
conn: iroh_net::endpoint::Connecting,
|
||||
auth_token: String,
|
||||
dbfile: Arc<TempPathGuard>,
|
||||
) -> Result<()> {
|
||||
let conn = conn.await?;
|
||||
let (mut send_stream, mut recv_stream) = conn.accept_bi().await?;
|
||||
|
||||
// Now we can be sure IO is not running.
|
||||
let mut files = vec![DataSource::with_name(
|
||||
dbfile.to_owned(),
|
||||
format!("db/{DBFILE_BACKUP_NAME}"),
|
||||
)];
|
||||
let blobdir = BlobDirContents::new(context).await?;
|
||||
for blob in blobdir.iter() {
|
||||
let path = blob.to_abs_path();
|
||||
let name = format!("blob/{}", blob.as_file_name());
|
||||
files.push(DataSource::with_name(path, name));
|
||||
// Read authentication token from the stream.
|
||||
let mut received_auth_token = vec![0u8; auth_token.len()];
|
||||
recv_stream.read_exact(&mut received_auth_token).await?;
|
||||
if received_auth_token.as_slice() != auth_token.as_bytes() {
|
||||
warn!(context, "Received wrong backup authentication token.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Start listening.
|
||||
let (db, hash) = iroh::provider::create_collection(files).await?;
|
||||
context.emit_event(SendProgress::CollectionCreated.into());
|
||||
let provider = Provider::builder(db)
|
||||
.bind_addr((Ipv4Addr::UNSPECIFIED, 0).into())
|
||||
.auth_token(token)
|
||||
.spawn()?;
|
||||
context.emit_event(SendProgress::ProviderListening.into());
|
||||
info!(context, "Waiting for remote to connect");
|
||||
let ticket = provider.ticket(hash)?;
|
||||
Ok((provider, ticket))
|
||||
info!(context, "Received valid backup authentication token.");
|
||||
|
||||
let blobdir = BlobDirContents::new(&context).await?;
|
||||
|
||||
let mut file_size = 0;
|
||||
file_size += dbfile.metadata()?.len();
|
||||
for blob in blobdir.iter() {
|
||||
file_size += blob.to_abs_path().metadata()?.len()
|
||||
}
|
||||
|
||||
send_stream.write_all(&file_size.to_be_bytes()).await?;
|
||||
|
||||
export_backup_stream(&context, &dbfile, blobdir, send_stream)
|
||||
.await
|
||||
.context("Failed to write backup into QUIC stream")?;
|
||||
info!(context, "Finished writing backup into QUIC stream.");
|
||||
let mut buf = [0u8; 1];
|
||||
info!(context, "Waiting for acknowledgment.");
|
||||
recv_stream.read_exact(&mut buf).await?;
|
||||
info!(context, "Received backup reception acknowledgement.");
|
||||
context.emit_event(EventType::ImexProgress(1000));
|
||||
|
||||
let mut msg = Message::new(Viewtype::Text);
|
||||
msg.text = backup_transfer_msg_body(&context).await;
|
||||
add_device_msg(&context, None, Some(&mut msg)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Supervises the iroh [`Provider`], terminating it when needed.
|
||||
///
|
||||
/// This will watch the provider and terminate it when:
|
||||
///
|
||||
/// - A transfer is completed, successful or unsuccessful.
|
||||
/// - An event could not be observed to protect against not knowing of a completed event.
|
||||
/// - The ongoing process is cancelled.
|
||||
///
|
||||
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
|
||||
/// we must cancel this operation.
|
||||
async fn watch_provider(
|
||||
context: &Context,
|
||||
mut provider: Provider,
|
||||
cancel_token: Receiver<()>,
|
||||
async fn accept_loop(
|
||||
context: Context,
|
||||
endpoint: Endpoint,
|
||||
auth_token: String,
|
||||
cancel_token: async_channel::Receiver<()>,
|
||||
drop_token: CancellationToken,
|
||||
) -> Result<()> {
|
||||
let mut events = provider.subscribe();
|
||||
let mut total_size = 0;
|
||||
let mut current_size = 0;
|
||||
let res = loop {
|
||||
dbfile: TempPathGuard,
|
||||
) {
|
||||
let dbfile = Arc::new(dbfile);
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
res = &mut provider => {
|
||||
break res.context("BackupProvider failed");
|
||||
},
|
||||
maybe_event = events.recv() => {
|
||||
match maybe_event {
|
||||
Ok(event) => {
|
||||
match event {
|
||||
Event::ClientConnected { ..} => {
|
||||
context.emit_event(SendProgress::ClientConnected.into());
|
||||
}
|
||||
Event::RequestReceived { .. } => {
|
||||
}
|
||||
Event::TransferCollectionStarted { total_blobs_size, .. } => {
|
||||
total_size = total_blobs_size;
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
}.into());
|
||||
}
|
||||
Event::TransferBlobCompleted { size, .. } => {
|
||||
current_size += size;
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
}.into());
|
||||
}
|
||||
Event::TransferCollectionCompleted { .. } => {
|
||||
context.emit_event(SendProgress::TransferInProgress {
|
||||
current_size: total_size,
|
||||
total_size
|
||||
}.into());
|
||||
provider.shutdown();
|
||||
}
|
||||
Event::TransferAborted { .. } => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider transfer aborted"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
// We should never see this, provider.join() should complete
|
||||
// first.
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
// We really shouldn't be lagging, if we did we may have missed
|
||||
// a completion event.
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("Missed events from BackupProvider"));
|
||||
|
||||
conn = endpoint.accept() => {
|
||||
if let Some(conn) = conn {
|
||||
// Got a new in-progress connection.
|
||||
let context = context.clone();
|
||||
let auth_token = auth_token.clone();
|
||||
let dbfile = dbfile.clone();
|
||||
if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).await {
|
||||
warn!(context, "Error while handling backup connection: {err:#}.");
|
||||
} else {
|
||||
info!(context, "Backup transfer finished successfully.");
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ = cancel_token.recv() => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider cancelled"));
|
||||
},
|
||||
context.emit_event(EventType::ImexProgress(0));
|
||||
break;
|
||||
}
|
||||
_ = drop_token.cancelled() => {
|
||||
provider.shutdown();
|
||||
break Err(anyhow!("BackupProvider dropped"));
|
||||
context.emit_event(EventType::ImexProgress(0));
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
match &res {
|
||||
Ok(_) => {
|
||||
context.emit_event(SendProgress::Completed.into());
|
||||
let mut msg = Message::new(Viewtype::Text);
|
||||
msg.text = backup_transfer_msg_body(context).await;
|
||||
add_device_msg(context, None, Some(&mut msg)).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(context, "Backup transfer failure: {err:#}");
|
||||
context.emit_event(SendProgress::Failed.into())
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Returns a QR code that allows fetching this backup.
|
||||
///
|
||||
/// This QR code can be passed to [`get_backup`] on a (different) device.
|
||||
pub fn qr(&self) -> Qr {
|
||||
Qr::Backup {
|
||||
ticket: self.ticket.clone(),
|
||||
Qr::Backup2 {
|
||||
node_addr: self.node_addr.clone(),
|
||||
|
||||
auth_token: self.auth_token.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -300,61 +273,14 @@ impl BackupProvider {
|
||||
impl Future for BackupProvider {
|
||||
type Output = Result<()>;
|
||||
|
||||
/// Waits for the backup transfer to complete.
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut self.handle).poll(cx)?
|
||||
}
|
||||
}
|
||||
|
||||
/// Create [`EventType::ImexProgress`] events using readable names.
|
||||
///
|
||||
/// Plus you get warnings if you don't use all variants.
|
||||
#[derive(Debug)]
|
||||
enum SendProgress {
|
||||
Failed,
|
||||
Started,
|
||||
DatabaseExported,
|
||||
CollectionCreated,
|
||||
ProviderListening,
|
||||
ClientConnected,
|
||||
TransferInProgress { current_size: u64, total_size: u64 },
|
||||
Completed,
|
||||
}
|
||||
|
||||
impl From<SendProgress> for EventType {
|
||||
fn from(source: SendProgress) -> Self {
|
||||
use SendProgress::*;
|
||||
let num: u16 = match source {
|
||||
Failed => 0,
|
||||
Started => 100,
|
||||
DatabaseExported => 300,
|
||||
CollectionCreated => 350,
|
||||
ProviderListening => 400,
|
||||
ClientConnected => 450,
|
||||
TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
} => {
|
||||
// the range is 450..=950
|
||||
450 + ((current_size as f64 / total_size as f64) * 500.).floor() as u16
|
||||
}
|
||||
Completed => 1000,
|
||||
};
|
||||
Self::ImexProgress(num.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Contacts a backup provider and receives the backup from it.
|
||||
///
|
||||
/// This uses a QR code to contact another instance of deltachat which is providing a backup
|
||||
/// using the [`BackupProvider`]. Once connected it will authenticate using the secrets in
|
||||
/// the QR code and retrieve the backup.
|
||||
///
|
||||
/// This is a long running operation which will only when completed.
|
||||
///
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts one specific variant of it. It
|
||||
/// does avoid having [`iroh::provider::Ticket`] in the primary API however, without
|
||||
/// having to revert to untyped bytes.
|
||||
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
/// Retrieves backup from a legacy backup provider using iroh 0.4.
|
||||
pub async fn get_legacy_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
ensure!(
|
||||
matches!(qr, Qr::Backup { .. }),
|
||||
"QR code for backup must be of type DCBACKUP"
|
||||
@@ -380,6 +306,64 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn get_backup2(
|
||||
context: &Context,
|
||||
node_addr: iroh_net::NodeAddr,
|
||||
auth_token: String,
|
||||
) -> Result<()> {
|
||||
let relay_mode = RelayMode::Disabled;
|
||||
|
||||
let endpoint = Endpoint::builder().relay_mode(relay_mode).bind(0).await?;
|
||||
|
||||
let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?;
|
||||
let (mut send_stream, mut recv_stream) = conn.open_bi().await?;
|
||||
info!(context, "Sending backup authentication token.");
|
||||
send_stream.write_all(auth_token.as_bytes()).await?;
|
||||
|
||||
let passphrase = String::new();
|
||||
info!(context, "Starting to read backup from the stream.");
|
||||
|
||||
let mut file_size_buf = [0u8; 8];
|
||||
recv_stream.read_exact(&mut file_size_buf).await?;
|
||||
let file_size = u64::from_be_bytes(file_size_buf);
|
||||
import_backup_stream(context, recv_stream, file_size, passphrase)
|
||||
.await
|
||||
.context("Failed to import backup from QUIC stream")?;
|
||||
info!(context, "Finished importing backup from the stream.");
|
||||
context.emit_event(EventType::ImexProgress(1000));
|
||||
|
||||
// Send an acknowledgement, but ignore the errors.
|
||||
// We have imported backup successfully already.
|
||||
send_stream.write_all(b".").await.ok();
|
||||
send_stream.finish().await.ok();
|
||||
info!(context, "Sent backup reception acknowledgment.");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Contacts a backup provider and receives the backup from it.
|
||||
///
|
||||
/// This uses a QR code to contact another instance of deltachat which is providing a backup
|
||||
/// using the [`BackupProvider`]. Once connected it will authenticate using the secrets in
|
||||
/// the QR code and retrieve the backup.
|
||||
///
|
||||
/// This is a long running operation which will return only when completed.
|
||||
///
|
||||
/// Using [`Qr`] as argument is a bit odd as it only accepts specific variants of it. It
|
||||
/// does avoid having [`iroh_old::provider::Ticket`] in the primary API however, without
|
||||
/// having to revert to untyped bytes.
|
||||
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
||||
match qr {
|
||||
Qr::Backup { .. } => get_legacy_backup(context, qr).await?,
|
||||
Qr::Backup2 {
|
||||
node_addr,
|
||||
auth_token,
|
||||
} => get_backup2(context, node_addr, auth_token).await?,
|
||||
_ => bail!("QR code for backup must be of type DCBACKUP or DCBACKUP2"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_backup_inner(context: &Context, qr: Qr) -> Result<()> {
|
||||
let ticket = match qr {
|
||||
Qr::Backup { ticket } => ticket,
|
||||
@@ -426,7 +410,7 @@ async fn transfer_from_provider(context: &Context, ticket: &Ticket) -> Result<()
|
||||
|
||||
// Perform the transfer.
|
||||
let keylog = false; // Do not enable rustls SSLKEYLOGFILE env var functionality
|
||||
let stats = iroh::get::run_ticket(
|
||||
let stats = iroh_old::get::run_ticket(
|
||||
ticket,
|
||||
keylog,
|
||||
MAX_CONCURRENT_DIALS,
|
||||
@@ -458,7 +442,7 @@ async fn on_blob(
|
||||
progress: &ProgressEmitter,
|
||||
jobs: &Mutex<JoinSet<()>>,
|
||||
ticket: &Ticket,
|
||||
_hash: Hash,
|
||||
_hash: iroh_old::Hash,
|
||||
mut reader: DataStream,
|
||||
name: String,
|
||||
) -> Result<DataStream> {
|
||||
@@ -640,24 +624,6 @@ mod tests {
|
||||
.await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_send_progress() {
|
||||
let cases = [
|
||||
((0, 100), 450),
|
||||
((10, 100), 500),
|
||||
((50, 100), 700),
|
||||
((100, 100), 950),
|
||||
];
|
||||
|
||||
for ((current_size, total_size), progress) in cases {
|
||||
let out = EventType::from(SendProgress::TransferInProgress {
|
||||
current_size,
|
||||
total_size,
|
||||
});
|
||||
assert_eq!(out, EventType::ImexProgress(progress));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_drop_provider() {
|
||||
let mut tcm = TestContextManager::new();
|
||||
|
||||
@@ -221,7 +221,7 @@ impl ChannelState {
|
||||
}
|
||||
|
||||
impl Context {
|
||||
/// Create magic endpoint and gossip.
|
||||
/// Create iroh endpoint and gossip.
|
||||
async fn init_peer_channels(&self) -> Result<Iroh> {
|
||||
let secret_key = SecretKey::generate();
|
||||
let public_key = secret_key.public();
|
||||
|
||||
43
src/qr.rs
43
src/qr.rs
@@ -37,8 +37,13 @@ const VCARD_SCHEME: &str = "BEGIN:VCARD";
|
||||
const SMTP_SCHEME: &str = "SMTP:";
|
||||
const HTTP_SCHEME: &str = "http://";
|
||||
const HTTPS_SCHEME: &str = "https://";
|
||||
|
||||
/// Legacy backup transfer based on iroh 0.4.
|
||||
pub(crate) const DCBACKUP_SCHEME: &str = "DCBACKUP:";
|
||||
|
||||
/// Backup transfer based on iroh-net.
|
||||
pub(crate) const DCBACKUP2_SCHEME: &str = "DCBACKUP2:";
|
||||
|
||||
/// Scanned QR code.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Qr {
|
||||
@@ -106,7 +111,7 @@ pub enum Qr {
|
||||
domain: String,
|
||||
},
|
||||
|
||||
/// Provides a backup that can be retrieve.
|
||||
/// Provides a backup that can be retrieved using legacy iroh 0.4.
|
||||
///
|
||||
/// This contains all the data needed to connect to a device and download a backup from
|
||||
/// it to configure the receiving device with the same account.
|
||||
@@ -120,6 +125,15 @@ pub enum Qr {
|
||||
ticket: iroh::provider::Ticket,
|
||||
},
|
||||
|
||||
/// Provides a backup that can be retrieved using iroh-net based backup transfer protocol.
|
||||
Backup2 {
|
||||
/// Iroh node address.
|
||||
node_addr: iroh_net::NodeAddr,
|
||||
|
||||
/// Authentication token.
|
||||
auth_token: String,
|
||||
},
|
||||
|
||||
/// Ask the user if they want to use the given service for video chats.
|
||||
WebrtcInstance {
|
||||
/// Server domain name.
|
||||
@@ -266,6 +280,8 @@ pub async fn check_qr(context: &Context, qr: &str) -> Result<Qr> {
|
||||
decode_webrtc_instance(context, qr)?
|
||||
} else if starts_with_ignore_case(qr, DCBACKUP_SCHEME) {
|
||||
decode_backup(qr)?
|
||||
} else if starts_with_ignore_case(qr, DCBACKUP2_SCHEME) {
|
||||
decode_backup2(qr)?
|
||||
} else if qr.starts_with(MAILTO_SCHEME) {
|
||||
decode_mailto(context, qr).await?
|
||||
} else if qr.starts_with(SMTP_SCHEME) {
|
||||
@@ -295,6 +311,13 @@ pub async fn check_qr(context: &Context, qr: &str) -> Result<Qr> {
|
||||
pub fn format_backup(qr: &Qr) -> Result<String> {
|
||||
match qr {
|
||||
Qr::Backup { ref ticket } => Ok(format!("{DCBACKUP_SCHEME}{ticket}")),
|
||||
Qr::Backup2 {
|
||||
ref node_addr,
|
||||
ref auth_token,
|
||||
} => {
|
||||
let node_addr = serde_json::to_string(node_addr)?;
|
||||
Ok(format!("{DCBACKUP2_SCHEME}{auth_token}&{node_addr}"))
|
||||
}
|
||||
_ => Err(anyhow!("Not a backup QR code")),
|
||||
}
|
||||
}
|
||||
@@ -529,6 +552,24 @@ fn decode_backup(qr: &str) -> Result<Qr> {
|
||||
Ok(Qr::Backup { ticket })
|
||||
}
|
||||
|
||||
/// Decodes a [`DCBACKUP2_SCHEME`] QR code.
|
||||
fn decode_backup2(qr: &str) -> Result<Qr> {
|
||||
let payload = qr
|
||||
.strip_prefix(DCBACKUP2_SCHEME)
|
||||
.ok_or_else(|| anyhow!("invalid DCBACKUP scheme"))?;
|
||||
let (auth_token, node_addr) = payload
|
||||
.split_once('&')
|
||||
.context("Backup QR code has no separator")?;
|
||||
let auth_token = auth_token.to_string();
|
||||
let node_addr = serde_json::from_str::<iroh_net::NodeAddr>(node_addr)
|
||||
.context("Invalid node addr in backup QR code")?;
|
||||
|
||||
Ok(Qr::Backup2 {
|
||||
node_addr,
|
||||
auth_token,
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CreateAccountSuccessResponse {
|
||||
/// Email address.
|
||||
|
||||
@@ -58,7 +58,7 @@ async fn generate_verification_qr(context: &Context) -> Result<String> {
|
||||
)
|
||||
}
|
||||
|
||||
/// Renders a [`Qr::Backup`] QR code as an SVG image.
|
||||
/// Renders a [`Qr::Backup2`] QR code as an SVG image.
|
||||
pub async fn generate_backup_qr(context: &Context, qr: &Qr) -> Result<String> {
|
||||
let content = qr::format_backup(qr)?;
|
||||
let (avatar, displayname, _addr, color) = self_info(context).await?;
|
||||
|
||||
Reference in New Issue
Block a user