mirror of
https://github.com/chatmail/core.git
synced 2026-05-02 21:06:31 +03:00
feat: Pause IO for BackupProvider (#4182)
This makes the BackupProvider automatically invoke pause-io while it is needed. It needed to make the guard independent from the Context lifetime to make this work. Which is a bit sad.
This commit is contained in:
committed by
GitHub
parent
e04efdbd94
commit
35f50a8965
@@ -2644,10 +2644,6 @@ void dc_str_unref (char* str);
|
|||||||
/**
|
/**
|
||||||
* Creates an object for sending a backup to another device.
|
* Creates an object for sending a backup to another device.
|
||||||
*
|
*
|
||||||
* Before calling this function IO must be stopped using dc_accounts_stop_io()
|
|
||||||
* or dc_stop_io() so that no changes to the blobs or database are happening.
|
|
||||||
* IO should only be restarted once dc_backup_provider_wait() has returned.
|
|
||||||
*
|
|
||||||
* The backup is sent to through a peer-to-peer channel which is bootstrapped
|
* The backup is sent to through a peer-to-peer channel which is bootstrapped
|
||||||
* by a QR-code. The backup contains the entire state of the account
|
* by a QR-code. The backup contains the entire state of the account
|
||||||
* including credentials. This can be used to setup a new device.
|
* including credentials. This can be used to setup a new device.
|
||||||
@@ -2708,9 +2704,7 @@ char* dc_backup_provider_get_qr_svg (const dc_backup_provider_t* backup_provider
|
|||||||
/**
|
/**
|
||||||
* Waits for the sending to finish.
|
* Waits for the sending to finish.
|
||||||
*
|
*
|
||||||
* This is a blocking call and should only be called once. Once this function
|
* This is a blocking call and should only be called once.
|
||||||
* returns IO can be started again using dc_accounts_start_io() or
|
|
||||||
* dc_start_io().
|
|
||||||
*
|
*
|
||||||
* @memberof dc_backup_provider_t
|
* @memberof dc_backup_provider_t
|
||||||
* @param backup_provider The backup provider object as created by
|
* @param backup_provider The backup provider object as created by
|
||||||
|
|||||||
@@ -91,7 +91,7 @@ pub async fn imex(
|
|||||||
let cancel = context.alloc_ongoing().await?;
|
let cancel = context.alloc_ongoing().await?;
|
||||||
|
|
||||||
let res = {
|
let res = {
|
||||||
let mut guard = context.scheduler.pause(context).await;
|
let mut guard = context.scheduler.pause(context.clone()).await;
|
||||||
let res = imex_inner(context, what, path, passphrase)
|
let res = imex_inner(context, what, path, passphrase)
|
||||||
.race(async {
|
.race(async {
|
||||||
cancel.recv().await.ok();
|
cancel.recv().await.ok();
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ impl BackupProvider {
|
|||||||
|
|
||||||
// Acquire global "ongoing" mutex.
|
// Acquire global "ongoing" mutex.
|
||||||
let cancel_token = context.alloc_ongoing().await?;
|
let cancel_token = context.alloc_ongoing().await?;
|
||||||
|
let mut paused_guard = context.scheduler.pause(context.clone()).await;
|
||||||
let context_dir = context
|
let context_dir = context
|
||||||
.get_blobdir()
|
.get_blobdir()
|
||||||
.parent()
|
.parent()
|
||||||
@@ -118,15 +119,19 @@ impl BackupProvider {
|
|||||||
Ok((provider, ticket)) => (provider, ticket),
|
Ok((provider, ticket)) => (provider, ticket),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
context.free_ongoing().await;
|
context.free_ongoing().await;
|
||||||
|
paused_guard.resume().await;
|
||||||
return Err(err);
|
return Err(err);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let handle = tokio::spawn(Self::watch_provider(
|
let handle = {
|
||||||
context.clone(),
|
let context = context.clone();
|
||||||
provider,
|
tokio::spawn(async move {
|
||||||
cancel_token,
|
let res = Self::watch_provider(&context, provider, cancel_token, dbfile).await;
|
||||||
dbfile,
|
context.free_ongoing().await;
|
||||||
));
|
paused_guard.resume().await;
|
||||||
|
res
|
||||||
|
})
|
||||||
|
};
|
||||||
let slf = Self { handle, ticket };
|
let slf = Self { handle, ticket };
|
||||||
let qr = slf.qr();
|
let qr = slf.qr();
|
||||||
*context.export_provider.lock().expect("poisoned lock") = Some(qr);
|
*context.export_provider.lock().expect("poisoned lock") = Some(qr);
|
||||||
@@ -181,7 +186,7 @@ impl BackupProvider {
|
|||||||
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
|
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
|
||||||
/// we must cancel this operation.
|
/// we must cancel this operation.
|
||||||
async fn watch_provider(
|
async fn watch_provider(
|
||||||
context: Context,
|
context: &Context,
|
||||||
mut provider: Provider,
|
mut provider: Provider,
|
||||||
cancel_token: Receiver<()>,
|
cancel_token: Receiver<()>,
|
||||||
_dbfile: TempPathGuard,
|
_dbfile: TempPathGuard,
|
||||||
@@ -262,7 +267,6 @@ impl BackupProvider {
|
|||||||
context.emit_event(SendProgress::Failed.into())
|
context.emit_event(SendProgress::Failed.into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
context.free_ongoing().await;
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -373,7 +377,7 @@ pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
|
|||||||
!context.is_configured().await?,
|
!context.is_configured().await?,
|
||||||
"Cannot import backups to accounts in use."
|
"Cannot import backups to accounts in use."
|
||||||
);
|
);
|
||||||
let mut guard = context.scheduler.pause(context).await;
|
let mut guard = context.scheduler.pause(context.clone()).await;
|
||||||
|
|
||||||
// Acquire global "ongoing" mutex.
|
// Acquire global "ongoing" mutex.
|
||||||
let cancel_token = context.alloc_ongoing().await?;
|
let cancel_token = context.alloc_ongoing().await?;
|
||||||
|
|||||||
@@ -95,10 +95,10 @@ impl SchedulerState {
|
|||||||
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
/// If in the meantime [`SchedulerState::start`] or [`SchedulerState::stop`] is called
|
||||||
/// resume will do the right thing and restore the scheduler to the state requested by
|
/// resume will do the right thing and restore the scheduler to the state requested by
|
||||||
/// the last call.
|
/// the last call.
|
||||||
pub(crate) async fn pause<'a>(&'_ self, context: &'a Context) -> IoPausedGuard<'a> {
|
pub(crate) async fn pause<'a>(&'_ self, context: Context) -> IoPausedGuard {
|
||||||
let mut inner = self.inner.write().await;
|
let mut inner = self.inner.write().await;
|
||||||
inner.paused = true;
|
inner.paused = true;
|
||||||
Self::do_stop(inner, context).await;
|
Self::do_stop(inner, &context).await;
|
||||||
IoPausedGuard {
|
IoPausedGuard {
|
||||||
context,
|
context,
|
||||||
done: false,
|
done: false,
|
||||||
@@ -195,12 +195,12 @@ struct InnerSchedulerState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct IoPausedGuard<'a> {
|
pub(crate) struct IoPausedGuard {
|
||||||
context: &'a Context,
|
context: Context,
|
||||||
done: bool,
|
done: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> IoPausedGuard<'a> {
|
impl IoPausedGuard {
|
||||||
pub(crate) async fn resume(&mut self) {
|
pub(crate) async fn resume(&mut self) {
|
||||||
self.done = true;
|
self.done = true;
|
||||||
let mut inner = self.context.scheduler.inner.write().await;
|
let mut inner = self.context.scheduler.inner.write().await;
|
||||||
@@ -211,7 +211,7 @@ impl<'a> IoPausedGuard<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Drop for IoPausedGuard<'a> {
|
impl Drop for IoPausedGuard {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.done {
|
if self.done {
|
||||||
return;
|
return;
|
||||||
|
|||||||
Reference in New Issue
Block a user