Split _wait and _unref

This also removes BackupProvider::join in favour of implementing
Future directly.  I wondered about implementing a FusedFutre to make
this a little safer but it would introduce a dependency on the futures
crate in deltachat-ffi which did not exist yet, so I didn't do that.
This commit is contained in:
Floris Bruynooghe
2023-02-20 15:56:05 +01:00
parent 73b90eee3e
commit b920db12c7
5 changed files with 31 additions and 14 deletions

View File

@@ -2706,13 +2706,21 @@ char * dc_backup_provider_qr_svg (dc_context_t* context, const dc_backup_provide
/** /**
* Waits for the sending to finish and frees the backup provider object. * Waits for the sending to finish and frees the backup provider object.
* *
* @memberof dc_backup_sender_t * This should only be called once.
*
* @memberof dc_backup_provider_t
* @param context The context. * @param context The context.
* @param backup_provider The backup provider object as created by * @param backup_provider The backup provider object as created by
* dc_backup_provider_new(). If NULL is given nothing is done. * dc_backup_provider_new(). If NULL is given nothing is done.
*/ */
void dc_backup_provider_wait (dc_context_t* context, dc_backup_provider_t* backup_provider); void dc_backup_provider_wait (dc_context_t* context, dc_backup_provider_t* backup_provider);
/**
* Frees a dc_backup_provider_t object.
*
* @memberof dc_backup_provider_t
*/
void dc_backup_prvider_unref (dc_backup_provider_t* backup_provider);
/** /**
* Gets a backup offered by a dc_backup_provider_t object on another device. * Gets a backup offered by a dc_backup_provider_t object on another device.

View File

@@ -4185,12 +4185,17 @@ pub unsafe extern "C" fn dc_backup_provider_wait(
return; return;
} }
let ctx = &*context; let ctx = &*context;
let provider = Box::from_raw(provider); let provider = &mut *provider;
block_on(provider.join()) block_on(provider)
.log_err(ctx, "Failed to join provider") .log_err(ctx, "Failed to join provider")
.ok(); .ok();
} }
#[no_mangle]
pub unsafe extern "C" fn dc_backup_provider_unref(provider: *mut dc_backup_provider_t) {
drop(Box::from_raw(provider));
}
#[no_mangle] #[no_mangle]
pub unsafe extern "C" fn dc_receive_backup( pub unsafe extern "C" fn dc_receive_backup(
context: *mut dc_context_t, context: *mut dc_context_t,

View File

@@ -1371,7 +1371,7 @@ impl CommandApi {
return Err(err); return Err(err);
} }
}; };
let res = provider.join().await; let res = provider.await;
ctx.start_io().await; ctx.start_io().await;
res res
} }

View File

@@ -492,7 +492,7 @@ pub async fn cmdline(context: Context, line: &str, chat_id: &mut ChatId) -> Resu
let provider = BackupProvider::prepare(&context).await?; let provider = BackupProvider::prepare(&context).await?;
let qr = provider.qr(); let qr = provider.qr();
println!("QR code: {}", format_backup(&qr)?); println!("QR code: {}", format_backup(&qr)?);
provider.join().await?; provider.await?;
} }
"receive-backup" => { "receive-backup" => {
ensure!(!arg1.is_empty(), "Argument <qr> is missing."); ensure!(!arg1.is_empty(), "Argument <qr> is missing.");

View File

@@ -22,7 +22,10 @@
//! getter can not connect to an impersonated provider and the provider does not offer the //! getter can not connect to an impersonated provider and the provider does not offer the
//! download to an impersonated getter. //! download to an impersonated getter.
use std::future::Future;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::Poll;
use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result}; use anyhow::{anyhow, bail, ensure, format_err, Context as _, Result};
use async_channel::Receiver; use async_channel::Receiver;
@@ -57,6 +60,9 @@ use super::{export_database, DeleteOnDrop, DBFILE_BACKUP_NAME};
/// ///
/// This starts a task which acquires the global "ongoing" mutex. If you need to stop the /// This starts a task which acquires the global "ongoing" mutex. If you need to stop the
/// task use the [`Context::stop_ongoing`] mechanism. /// task use the [`Context::stop_ongoing`] mechanism.
///
/// The task implements [`Future`] and awaiting it will complete once a transfer has been
/// either completed or aborted.
#[derive(Debug)] #[derive(Debug)]
pub struct BackupProvider { pub struct BackupProvider {
/// The supervisor task, run by [`BackupProvider::watch_provider`]. /// The supervisor task, run by [`BackupProvider::watch_provider`].
@@ -244,15 +250,13 @@ impl BackupProvider {
ticket: self.ticket.clone(), ticket: self.ticket.clone(),
} }
} }
}
/// Awaits the [`BackupProvider`] until it is finished. impl Future for BackupProvider {
/// type Output = Result<()>;
/// This waits until someone connected to the sender and finished transferring a backup.
/// A failed transfer also counts as a finished transfer. If the [`BackupProvider`] fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
/// task results in an error it will be returned here. Pin::new(&mut self.handle).poll(cx)?
pub async fn join(self) -> Result<()> {
self.handle.await??;
Ok(())
} }
} }
@@ -507,7 +511,7 @@ mod tests {
get_backup(&ctx1, provider.qr()).await.unwrap(); get_backup(&ctx1, provider.qr()).await.unwrap();
// Make sure the provider finishes without an error. // Make sure the provider finishes without an error.
tokio::time::timeout(Duration::from_secs(30), provider.join()) tokio::time::timeout(Duration::from_secs(30), provider)
.await .await
.expect("timed out") .expect("timed out")
.expect("error in provider"); .expect("error in provider");