From 616eabc613d25d778de246543fc728d78804e6d4 Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Wed, 22 Mar 2023 17:42:21 +0100 Subject: [PATCH] feat: Make the IoPausedGuard a simple sender (#4184) This replaces the mechanism by which the IoPauseGuard makes sure the IO scheduler is resumed: it really is a drop guard now by sending a single message on drop. This makes it not have to hold on to anything like the context so makes it a lot easier to use. The trade-off is that a long-running task is spawned when the guard is created, this task needs to receive the message from the drop guard in order for the scheduler to resume. --- src/imex.rs | 8 +++---- src/imex/transfer.rs | 10 ++++----- src/scheduler.rs | 52 +++++++++++++++++++++----------------------- 3 files changed, 33 insertions(+), 37 deletions(-) diff --git a/src/imex.rs b/src/imex.rs index 8c6b81657..885187cae 100644 --- a/src/imex.rs +++ b/src/imex.rs @@ -91,15 +91,13 @@ pub async fn imex( let cancel = context.alloc_ongoing().await?; let res = { - let mut guard = context.scheduler.pause(context.clone()).await; - let res = imex_inner(context, what, path, passphrase) + let _guard = context.scheduler.pause(context.clone()).await; + imex_inner(context, what, path, passphrase) .race(async { cancel.recv().await.ok(); Err(format_err!("canceled")) }) - .await; - guard.resume().await; - res + .await }; context.free_ongoing().await; diff --git a/src/imex/transfer.rs b/src/imex/transfer.rs index 6d5564c94..bf3bfb0ff 100644 --- a/src/imex/transfer.rs +++ b/src/imex/transfer.rs @@ -91,7 +91,7 @@ impl BackupProvider { // Acquire global "ongoing" mutex. let cancel_token = context.alloc_ongoing().await?; - let mut paused_guard = context.scheduler.pause(context.clone()).await; + let paused_guard = context.scheduler.pause(context.clone()).await; let context_dir = context .get_blobdir() .parent() @@ -119,7 +119,6 @@ impl BackupProvider { Ok((provider, ticket)) => (provider, ticket), Err(err) => { context.free_ongoing().await; - paused_guard.resume().await; return Err(err); } }; @@ -128,7 +127,9 @@ impl BackupProvider { tokio::spawn(async move { let res = Self::watch_provider(&context, provider, cancel_token).await; context.free_ongoing().await; - paused_guard.resume().await; + + // Explicit drop to move the guards into this future + drop(paused_guard); drop(dbfile); res }) @@ -369,7 +370,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { !context.is_configured().await?, "Cannot import backups to accounts in use." ); - let mut guard = context.scheduler.pause(context.clone()).await; + let _guard = context.scheduler.pause(context.clone()).await; // Acquire global "ongoing" mutex. let cancel_token = context.alloc_ongoing().await?; @@ -381,7 +382,6 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { } _ = cancel_token.recv() => Err(format_err!("cancelled")), }; - guard.resume().await; res } diff --git a/src/scheduler.rs b/src/scheduler.rs index c37cb3f86..dada92f9c 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,7 +5,7 @@ use anyhow::{bail, Context as _, Result}; use async_channel::{self as channel, Receiver, Sender}; use futures::future::try_join_all; use futures_lite::FutureExt; -use tokio::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::{oneshot, RwLock, RwLockWriteGuard}; use tokio::task; use self::connectivity::ConnectivityStore; @@ -89,20 +89,28 @@ impl SchedulerState { /// Pauses the IO scheduler. /// - /// If it is currently running the scheduler will be stopped. When - /// [`IoPausedGuard::resume`] is called the scheduler is started again. + /// If it is currently running the scheduler will be stopped. When the + /// [`IoPausedGuard`] is dropped the scheduler is started again. /// /// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called /// resume will do the right thing and restore the scheduler to the state requested by /// the last call. pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard { - let mut inner = self.inner.write().await; - inner.paused = true; - Self::do_stop(inner, &context).await; - IoPausedGuard { - context, - done: false, + { + let mut inner = self.inner.write().await; + inner.paused = true; + Self::do_stop(inner, &context).await; } + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + rx.await.ok(); + let mut inner = context.scheduler.inner.write().await; + inner.paused = false; + if inner.started && inner.scheduler.is_none() { + SchedulerState::do_start(inner, context.clone()).await; + } + }); + IoPausedGuard { sender: Some(tx) } } /// Restarts the scheduler, only if it is running. @@ -194,31 +202,21 @@ struct InnerSchedulerState { paused: bool, } +/// Guard to make sure the IO Scheduler is resumed. +/// +/// Returned by [`SchedulerState::pause`]. To resume the IO scheduler simply drop this +/// guard. #[derive(Debug)] pub(crate) struct IoPausedGuard { - context: Context, - done: bool, -} - -impl IoPausedGuard { - pub(crate) async fn resume(&mut self) { - self.done = true; - let mut inner = self.context.scheduler.inner.write().await; - inner.paused = false; - if inner.started && inner.scheduler.is_none() { - SchedulerState::do_start(inner, self.context.clone()).await; - } - } + sender: Option>, } impl Drop for IoPausedGuard { fn drop(&mut self) { - if self.done { - return; + if let Some(sender) = self.sender.take() { + // Can only fail if receiver is dropped, but then we're already resumed. + sender.send(()).ok(); } - - // Async .resume() should be called manually due to lack of async drop. - error!(self.context, "Pause guard dropped without resuming."); } }