From 0b075ac762c527d210fb7fff65ef51913f2bbc59 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Mon, 6 Feb 2023 14:58:08 +0100 Subject: [PATCH] Stop after a transfer happened. --- Cargo.lock | 88 +++---------------------------------- src/imex/transfer.rs | 102 ++++++++++++++++++++++++++++--------------- 2 files changed, 74 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f3fec720..bcf3bdbd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -705,7 +705,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" dependencies = [ "heck", - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn", @@ -1237,7 +1237,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ef71ddb5b3a1f53dee24817c8f70dfa1cb29e804c18d88c228d4bc9c86ee3b9" dependencies = [ - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn", @@ -1846,37 +1846,6 @@ dependencies = [ "slab", ] -[[package]] -name = "genawaiter" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c86bd0361bcbde39b13475e6e36cb24c329964aa2611be285289d1e4b751c1a0" -dependencies = [ - "futures-core", - "genawaiter-macro", - "genawaiter-proc-macro", - "proc-macro-hack", -] - -[[package]] -name = "genawaiter-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b32dfe1fdfc0bbde1f22a5da25355514b5e450c33a6af6770884c8750aedfbc" - -[[package]] -name = "genawaiter-proc-macro" -version = "0.99.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784f84eebc366e15251c4a8c3acee82a6a6f427949776ecb88377362a9621738" -dependencies = [ - "proc-macro-error 0.4.12", - "proc-macro-hack", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "generic-array" version = "0.14.6" @@ -2888,7 +2857,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "734aa7a4a6390b162112523cac2923a18e4f23b917880a68c826bf6e8bf48f06" dependencies = [ "Inflector", - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn", @@ -3159,45 +3128,19 @@ dependencies = [ "log", ] -[[package]] -name = "proc-macro-error" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18f33027081eba0a6d8aba6d1b1c3a3be58cbb12106341c2d5759fcd9b5277e7" -dependencies = [ - "proc-macro-error-attr 0.4.12", - "proc-macro2", - "quote", - "syn", - "version_check 0.9.4", -] - [[package]] name = "proc-macro-error" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" dependencies = [ - "proc-macro-error-attr 1.0.4", + "proc-macro-error-attr", "proc-macro2", "quote", "syn", "version_check 0.9.4", ] -[[package]] -name = "proc-macro-error-attr" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a5b4b77fdb63c1eca72173d68d24501c54ab1269409f6b672c85deb18af69de" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "syn-mid", - "version_check 0.9.4", -] - [[package]] name = "proc-macro-error-attr" version = "1.0.4" @@ -3209,12 +3152,6 @@ dependencies = [ "version_check 0.9.4", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.20+deprecated" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" - [[package]] name = "proc-macro2" version = "1.0.46" @@ -3934,6 +3871,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bao", + "base64 0.21.0", "blake3", "bytes", "clap 4.1.4", @@ -3941,10 +3879,7 @@ dependencies = [ "der", "ed25519-dalek", "futures", - "genawaiter", - "hex", "indicatif", - "is-terminal", "postcard", "rand 0.7.3", "rcgen", @@ -4266,17 +4201,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "syn-mid" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baa8e7560a164edb1621a55d18a0c59abf49d360f47aa7b821061dd7eea7fac9" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "sync_wrapper" version = "0.1.1" @@ -4817,7 +4741,7 @@ checksum = "e5c1bfe689e4067733530495b04959b00f05cd95f038bed59af4fc70b3e26240" dependencies = [ "darling 0.13.4", "ident_case", - "proc-macro-error 1.0.4", + "proc-macro-error", "proc-macro2", "quote", "syn", diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index f97122d84..aaeb952d2 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -32,11 +32,14 @@ use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; use async_channel::Receiver; use futures_lite::StreamExt; use sendme::blobs::Collection; -use sendme::get::{AsyncSliceDecoder, Hash, Options, ReceiveStream}; +use sendme::get::{DataStream, Options}; use sendme::protocol::AuthToken; -use sendme::provider::{DataSource, Provider, Ticket}; +use sendme::provider::{DataSource, Event, Provider, Ticket}; +use sendme::Hash; use tokio::fs::{self, File}; use tokio::io::{self, BufWriter}; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; use tokio_stream::wrappers::ReadDirStream; use super::{export_database, BlobDirContents, DeleteOnDrop, DBFILE_BACKUP_NAME}; @@ -48,15 +51,15 @@ use super::{export_database, BlobDirContents, DeleteOnDrop, DBFILE_BACKUP_NAME}; /// /// This does not make a full backup on disk, only the SQLite database is created on disk, /// the blobs in the blob directory are not copied. +/// +/// This starts a task which acquires the global "ongoing" mutex. If you need to stop the +/// task use the [`Context::stop_ongoing`] mechanism. #[derive(Debug)] pub struct BackupProvider { - /// A handle to the running provider. - provider: Provider, + /// The supervisor task, run by [`BackupProvider::watch_provider`]. + handle: JoinHandle>, /// The ticket to retrieve the backup collection. ticket: Ticket, - /// Token holding the "ongoing" mutex. When this completes the provider should shut - /// down. - cancel_token: Receiver<()>, } impl BackupProvider { @@ -106,13 +109,13 @@ impl BackupProvider { return Err(err); } }; - Ok(Self { - provider, - ticket, - cancel_token, - }) + let handle = tokio::spawn(Self::watch_provider(provider, cancel_token)); + Ok(Self { handle, ticket }) } + /// Creates the provider and supervisor tasks. + /// + /// Having this as a function makes it easier to cancel it when needed. async fn prepare_inner(context: &Context, dir: &Path) -> Result<(Provider, Ticket)> { // Generate the token up front: we also use it to encrypt the database. let token = AuthToken::generate(); @@ -135,31 +138,62 @@ impl BackupProvider { Ok((provider, ticket)) } + /// Supervises the sendme [`Provider`] terminating it when needed. + /// + /// This will watch the provider and terminate it when: + /// - A transfer is completed, successful or unsuccessful. + /// - An event could not be observed to protect against not knowing of a completed event. + /// - The ongoing process is cancelled. + /// + /// The *cancel_token* is the handle for the ongoing process mutex, when this completes + /// we must cancel this operation. + async fn watch_provider(mut provider: Provider, cancel_token: Receiver<()>) -> Result<()> { + let mut events = provider.subscribe(); + loop { + tokio::select! { + biased; + res = &mut provider => { + return res.context("BackupSender failed"); + }, + maybe_event = events.recv() => { + match maybe_event { + Ok(event) => { + match event { + Event::TransferCompleted { .. } => { + provider.abort(); + } + Event::TransferAborted { .. } => { + provider.abort(); + return Err(anyhow!("BackupSender transfer aborted")); + } + _ => (), + } + } + Err(broadcast::error::RecvError::Closed) => { + // We should never see this, provider.join() should complete + // first. + } + Err(broadcast::error::RecvError::Lagged(_)) => { + // We really shouldn't be lagging, if we did we may have missed + // a completion event. + provider.abort(); + return Err(anyhow!("Missed events from BackupSender")); + } + } + }, + _ = cancel_token.recv() => { + provider.abort(); + return Err(anyhow!("BackupSender cancelled")); + }, + } + } + } + pub fn qr(&self) -> Qr { Qr::Backup { ticket: self.ticket.clone(), } } - - /// Wait for the backup sender to complete. - /// - /// The sender completes when an authenticated client disconnects, whether the transfer - /// was successful or not. When the ongoing task is cancelled the sender also completes - /// with an error. - /// - /// Note that this must be called and awaited for the ongoing cancellation to work. - pub async fn join(self) -> Result<()> { - // TODO: should wait for 1 transfer to complete or abort - tokio::select! { - biased; - res = self.provider.join() => res.context("BackupSender failed"), - _ = self.cancel_token.recv() => Err(anyhow!("BackupSender cancelled")), - } - } - - pub fn abort(&self) { - self.provider.abort() - } } /// Contacts a backup provider and receives the backup from it. @@ -233,9 +267,9 @@ async fn on_blob( context: &Context, ticket: &Ticket, _hash: Hash, - mut reader: AsyncSliceDecoder, + mut reader: DataStream, name: String, -) -> Result> { +) -> Result { ensure!(!name.is_empty(), "Received a nameless blob"); let path = if name == DBFILE_BACKUP_NAME { // We can only safely write to the blobdir. But the blobdir could have a file named