feat(scheduler): Allow overlapping IoPauseGuards (#4246)

This enables using multiple pause guards at the same time.
This commit is contained in:
Floris Bruynooghe
2023-03-30 12:40:13 +02:00
committed by GitHub
parent 82ace72527
commit 91c10b3ac6

View File

@@ -50,7 +50,7 @@ impl SchedulerState {
pub(crate) async fn start(&self, context: Context) { pub(crate) async fn start(&self, context: Context) {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.started = true; inner.started = true;
if inner.scheduler.is_none() && !inner.paused { if inner.scheduler.is_none() && inner.paused == 0 {
Self::do_start(inner, context).await; Self::do_start(inner, context).await;
} }
} }
@@ -99,15 +99,15 @@ impl SchedulerState {
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard { pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
{ {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.paused = true; inner.paused += 1;
Self::do_stop(inner, &context).await; Self::do_stop(inner, &context).await;
} }
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
tokio::spawn(async move { tokio::spawn(async move {
rx.await.ok(); rx.await.ok();
let mut inner = context.scheduler.inner.write().await; let mut inner = context.scheduler.inner.write().await;
inner.paused = false; inner.paused -= 1;
if inner.started && inner.scheduler.is_none() { if inner.paused == 0 && inner.started && inner.scheduler.is_none() {
SchedulerState::do_start(inner, context.clone()).await; SchedulerState::do_start(inner, context.clone()).await;
} }
}); });
@@ -199,8 +199,10 @@ impl SchedulerState {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct InnerSchedulerState { struct InnerSchedulerState {
scheduler: Option<Scheduler>, scheduler: Option<Scheduler>,
/// Whether IO should be started if there is no [`IoPausedGuard`] active.
started: bool, started: bool,
paused: bool, /// The number of [`IoPausedGuard`]s that are outstanding.
paused: u32,
} }
/// Guard to make sure the IO Scheduler is resumed. /// Guard to make sure the IO Scheduler is resumed.