Stop after a transfer happened.

This commit is contained in:
Floris Bruynooghe
2023-02-06 14:58:08 +01:00
parent a6c889ed5e
commit 0b075ac762
2 changed files with 74 additions and 116 deletions

View File

@@ -32,11 +32,14 @@ use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver;
use futures_lite::StreamExt;
use sendme::blobs::Collection;
use sendme::get::{AsyncSliceDecoder, Hash, Options, ReceiveStream};
use sendme::get::{DataStream, Options};
use sendme::protocol::AuthToken;
use sendme::provider::{DataSource, Provider, Ticket};
use sendme::provider::{DataSource, Event, Provider, Ticket};
use sendme::Hash;
use tokio::fs::{self, File};
use tokio::io::{self, BufWriter};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReadDirStream;
use super::{export_database, BlobDirContents, DeleteOnDrop, DBFILE_BACKUP_NAME};
@@ -48,15 +51,15 @@ use super::{export_database, BlobDirContents, DeleteOnDrop, DBFILE_BACKUP_NAME};
///
/// This does not make a full backup on disk, only the SQLite database is created on disk,
/// the blobs in the blob directory are not copied.
///
/// This starts a task which acquires the global "ongoing" mutex. If you need to stop the
/// task use the [`Context::stop_ongoing`] mechanism.
#[derive(Debug)]
pub struct BackupProvider {
/// A handle to the running provider.
provider: Provider,
/// The supervisor task, run by [`BackupProvider::watch_provider`].
handle: JoinHandle<Result<()>>,
/// The ticket to retrieve the backup collection.
ticket: Ticket,
/// Token holding the "ongoing" mutex. When this completes the provider should shut
/// down.
cancel_token: Receiver<()>,
}
impl BackupProvider {
@@ -106,13 +109,13 @@ impl BackupProvider {
return Err(err);
}
};
Ok(Self {
provider,
ticket,
cancel_token,
})
let handle = tokio::spawn(Self::watch_provider(provider, cancel_token));
Ok(Self { handle, ticket })
}
/// Creates the provider and supervisor tasks.
///
/// Having this as a function makes it easier to cancel it when needed.
async fn prepare_inner(context: &Context, dir: &Path) -> Result<(Provider, Ticket)> {
// Generate the token up front: we also use it to encrypt the database.
let token = AuthToken::generate();
@@ -135,31 +138,62 @@ impl BackupProvider {
Ok((provider, ticket))
}
/// Supervises the sendme [`Provider`] terminating it when needed.
///
/// This will watch the provider and terminate it when:
/// - A transfer is completed, successful or unsuccessful.
/// - An event could not be observed to protect against not knowing of a completed event.
/// - The ongoing process is cancelled.
///
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
/// we must cancel this operation.
async fn watch_provider(mut provider: Provider, cancel_token: Receiver<()>) -> Result<()> {
let mut events = provider.subscribe();
loop {
tokio::select! {
biased;
res = &mut provider => {
return res.context("BackupSender failed");
},
maybe_event = events.recv() => {
match maybe_event {
Ok(event) => {
match event {
Event::TransferCompleted { .. } => {
provider.abort();
}
Event::TransferAborted { .. } => {
provider.abort();
return Err(anyhow!("BackupSender transfer aborted"));
}
_ => (),
}
}
Err(broadcast::error::RecvError::Closed) => {
// We should never see this, provider.join() should complete
// first.
}
Err(broadcast::error::RecvError::Lagged(_)) => {
// We really shouldn't be lagging, if we did we may have missed
// a completion event.
provider.abort();
return Err(anyhow!("Missed events from BackupSender"));
}
}
},
_ = cancel_token.recv() => {
provider.abort();
return Err(anyhow!("BackupSender cancelled"));
},
}
}
}
pub fn qr(&self) -> Qr {
Qr::Backup {
ticket: self.ticket.clone(),
}
}
/// Wait for the backup sender to complete.
///
/// The sender completes when an authenticated client disconnects, whether the transfer
/// was successful or not. When the ongoing task is cancelled the sender also completes
/// with an error.
///
/// Note that this must be called and awaited for the ongoing cancellation to work.
pub async fn join(self) -> Result<()> {
// TODO: should wait for 1 transfer to complete or abort
tokio::select! {
biased;
res = self.provider.join() => res.context("BackupSender failed"),
_ = self.cancel_token.recv() => Err(anyhow!("BackupSender cancelled")),
}
}
pub fn abort(&self) {
self.provider.abort()
}
}
/// Contacts a backup provider and receives the backup from it.
@@ -233,9 +267,9 @@ async fn on_blob(
context: &Context,
ticket: &Ticket,
_hash: Hash,
mut reader: AsyncSliceDecoder<ReceiveStream>,
mut reader: DataStream,
name: String,
) -> Result<AsyncSliceDecoder<ReceiveStream>> {
) -> Result<DataStream> {
ensure!(!name.is_empty(), "Received a nameless blob");
let path = if name == DBFILE_BACKUP_NAME {
// We can only safely write to the blobdir. But the blobdir could have a file named