mirror of
https://github.com/chatmail/core.git
synced 2026-05-14 04:16:30 +03:00
feat: Make the IoPausedGuard a simple sender (#4184)
This replaces the mechanism by which the IoPauseGuard makes sure the IO scheduler is resumed: it really is a drop guard now by sending a single message on drop. This makes it not have to hold on to anything like the context so makes it a lot easier to use. The trade-off is that a long-running task is spawned when the guard is created, this task needs to receive the message from the drop guard in order for the scheduler to resume.
This commit is contained in:
committed by
GitHub
parent
89b32e02c5
commit
616eabc613
@@ -91,15 +91,13 @@ pub async fn imex(
|
|||||||
let cancel = context.alloc_ongoing().await?;
|
let cancel = context.alloc_ongoing().await?;
|
||||||
|
|
||||||
let res = {
|
let res = {
|
||||||
let mut guard = context.scheduler.pause(context.clone()).await;
|
let _guard = context.scheduler.pause(context.clone()).await;
|
||||||
let res = imex_inner(context, what, path, passphrase)
|
imex_inner(context, what, path, passphrase)
|
||||||
.race(async {
|
.race(async {
|
||||||
cancel.recv().await.ok();
|
cancel.recv().await.ok();
|
||||||
Err(format_err!("canceled"))
|
Err(format_err!("canceled"))
|
||||||
})
|
})
|
||||||
.await;
|
.await
|
||||||
guard.resume().await;
|
|
||||||
res
|
|
||||||
};
|
};
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
|
|
||||||
|
|||||||
@@ -91,7 +91,7 @@ impl BackupProvider {
|
|||||||
|
|
||||||
// Acquire global "ongoing" mutex.
|
// Acquire global "ongoing" mutex.
|
||||||
let cancel_token = context.alloc_ongoing().await?;
|
let cancel_token = context.alloc_ongoing().await?;
|
||||||
let mut paused_guard = context.scheduler.pause(context.clone()).await;
|
let paused_guard = context.scheduler.pause(context.clone()).await;
|
||||||
let context_dir = context
|
let context_dir = context
|
||||||
.get_blobdir()
|
.get_blobdir()
|
||||||
.parent()
|
.parent()
|
||||||
@@ -119,7 +119,6 @@ impl BackupProvider {
|
|||||||
Ok((provider, ticket)) => (provider, ticket),
|
Ok((provider, ticket)) => (provider, ticket),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
paused_guard.resume().await;
|
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -128,7 +127,9 @@ impl BackupProvider {
|
|||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let res = Self::watch_provider(&context, provider, cancel_token).await;
|
let res = Self::watch_provider(&context, provider, cancel_token).await;
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
paused_guard.resume().await;
|
|
||||||
|
// Explicit drop to move the guards into this future
|
||||||
|
drop(paused_guard);
|
||||||
drop(dbfile);
|
drop(dbfile);
|
||||||
res
|
res
|
||||||
})
|
})
|
||||||
@@ -369,7 +370,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
|||||||
!context.is_configured().await?,
|
!context.is_configured().await?,
|
||||||
"Cannot import backups to accounts in use."
|
"Cannot import backups to accounts in use."
|
||||||
);
|
);
|
||||||
let mut guard = context.scheduler.pause(context.clone()).await;
|
let _guard = context.scheduler.pause(context.clone()).await;
|
||||||
|
|
||||||
// Acquire global "ongoing" mutex.
|
// Acquire global "ongoing" mutex.
|
||||||
let cancel_token = context.alloc_ongoing().await?;
|
let cancel_token = context.alloc_ongoing().await?;
|
||||||
@@ -381,7 +382,6 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
|||||||
}
|
}
|
||||||
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
_ = cancel_token.recv() => Err(format_err!("cancelled")),
|
||||||
};
|
};
|
||||||
guard.resume().await;
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use anyhow::{bail, Context as _, Result};
|
|||||||
use async_channel::{self as channel, Receiver, Sender};
|
use async_channel::{self as channel, Receiver, Sender};
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use futures_lite::FutureExt;
|
use futures_lite::FutureExt;
|
||||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
use tokio::sync::{oneshot, RwLock, RwLockWriteGuard};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
use self::connectivity::ConnectivityStore;
|
use self::connectivity::ConnectivityStore;
|
||||||
@@ -89,20 +89,28 @@ impl SchedulerState {
|
|||||||
|
|
||||||
/// Pauses the IO scheduler.
|
/// Pauses the IO scheduler.
|
||||||
///
|
///
|
||||||
/// If it is currently running the scheduler will be stopped. When
|
/// If it is currently running the scheduler will be stopped. When the
|
||||||
/// [`IoPausedGuard::resume`] is called the scheduler is started again.
|
/// [`IoPausedGuard`] is dropped the scheduler is started again.
|
||||||
///
|
///
|
||||||
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
||||||
/// resume will do the right thing and restore the scheduler to the state requested by
|
/// resume will do the right thing and restore the scheduler to the state requested by
|
||||||
/// the last call.
|
/// the last call.
|
||||||
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
|
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
|
||||||
let mut inner = self.inner.write().await;
|
{
|
||||||
inner.paused = true;
|
let mut inner = self.inner.write().await;
|
||||||
Self::do_stop(inner, &context).await;
|
inner.paused = true;
|
||||||
IoPausedGuard {
|
Self::do_stop(inner, &context).await;
|
||||||
context,
|
|
||||||
done: false,
|
|
||||||
}
|
}
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
rx.await.ok();
|
||||||
|
let mut inner = context.scheduler.inner.write().await;
|
||||||
|
inner.paused = false;
|
||||||
|
if inner.started && inner.scheduler.is_none() {
|
||||||
|
SchedulerState::do_start(inner, context.clone()).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
IoPausedGuard { sender: Some(tx) }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Restarts the scheduler, only if it is running.
|
/// Restarts the scheduler, only if it is running.
|
||||||
@@ -194,31 +202,21 @@ struct InnerSchedulerState {
|
|||||||
paused: bool,
|
paused: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Guard to make sure the IO Scheduler is resumed.
|
||||||
|
///
|
||||||
|
/// Returned by [`SchedulerState::pause`]. To resume the IO scheduler simply drop this
|
||||||
|
/// guard.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct IoPausedGuard {
|
pub(crate) struct IoPausedGuard {
|
||||||
context: Context,
|
sender: Option<oneshot::Sender<()>>,
|
||||||
done: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IoPausedGuard {
|
|
||||||
pub(crate) async fn resume(&mut self) {
|
|
||||||
self.done = true;
|
|
||||||
let mut inner = self.context.scheduler.inner.write().await;
|
|
||||||
inner.paused = false;
|
|
||||||
if inner.started && inner.scheduler.is_none() {
|
|
||||||
SchedulerState::do_start(inner, self.context.clone()).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for IoPausedGuard {
|
impl Drop for IoPausedGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.done {
|
if let Some(sender) = self.sender.take() {
|
||||||
return;
|
// Can only fail if receiver is dropped, but then we're already resumed.
|
||||||
|
sender.send(()).ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Async .resume() should be called manually due to lack of async drop.
|
|
||||||
error!(self.context, "Pause guard dropped without resuming.");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user