Make stuff work. With test!

This commit is contained in:
Floris Bruynooghe
2023-02-07 17:18:34 +01:00
parent 0b075ac762
commit 187861c3b2
3 changed files with 893 additions and 335 deletions

1123
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -101,6 +101,7 @@ log = "0.4"
pretty_env_logger = "0.4"
proptest = { version = "1", default-features = false, features = ["std"] }
tempfile = "3"
testdir = "0.7.1"
tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "macros"] }
[workspace]

View File

@@ -72,7 +72,7 @@ impl BackupProvider {
/// This will acquire the global "ongoing process" mutex. You must call
/// [`BackupSender::join`] after creating this struct, otherwise this will not respect
/// the possible cancellation of the "ongoing process".
pub async fn perpare(context: &Context, dir: &Path) -> Result<Self> {
pub async fn prepare(context: &Context, dir: &Path) -> Result<Self> {
ensure!(
// TODO: Should we worry about path normalisation?
dir != context.get_blobdir(),
@@ -109,7 +109,11 @@ impl BackupProvider {
return Err(err);
}
};
let handle = tokio::spawn(Self::watch_provider(provider, cancel_token));
let handle = tokio::spawn(Self::watch_provider(
context.clone(),
provider,
cancel_token,
));
Ok(Self { handle, ticket })
}
@@ -147,24 +151,28 @@ impl BackupProvider {
///
/// The *cancel_token* is the handle for the ongoing process mutex, when this completes
/// we must cancel this operation.
async fn watch_provider(mut provider: Provider, cancel_token: Receiver<()>) -> Result<()> {
async fn watch_provider(
context: Context,
mut provider: Provider,
cancel_token: Receiver<()>,
) -> Result<()> {
let mut events = provider.subscribe();
loop {
let res = loop {
tokio::select! {
biased;
res = &mut provider => {
return res.context("BackupSender failed");
break res.context("BackupSender failed");
},
maybe_event = events.recv() => {
match maybe_event {
Ok(event) => {
match event {
Event::TransferCompleted { .. } => {
provider.abort();
provider.shutdown();
}
Event::TransferAborted { .. } => {
provider.abort();
return Err(anyhow!("BackupSender transfer aborted"));
provider.shutdown();
break Err(anyhow!("BackupSender transfer aborted"));
}
_ => (),
}
@@ -176,17 +184,19 @@ impl BackupProvider {
Err(broadcast::error::RecvError::Lagged(_)) => {
// We really shouldn't be lagging, if we did we may have missed
// a completion event.
provider.abort();
return Err(anyhow!("Missed events from BackupSender"));
provider.shutdown();
break Err(anyhow!("Missed events from BackupSender"));
}
}
},
_ = cancel_token.recv() => {
provider.abort();
return Err(anyhow!("BackupSender cancelled"));
provider.shutdown();
break Err(anyhow!("BackupSender cancelled"));
},
}
}
};
context.free_ongoing().await;
res
}
pub fn qr(&self) -> Qr {
@@ -194,6 +204,15 @@ impl BackupProvider {
ticket: self.ticket.clone(),
}
}
/// Awaits the [`BackupSender`] until it is finished.
///
/// This waits until someone connected to the sender and transferred a backup. If the
/// [`BackupSender`] task results in an error it will be returned here.
pub async fn join(self) -> Result<()> {
self.handle.await??;
Ok(())
}
}
/// Contacts a backup provider and receives the backup from it.
@@ -299,3 +318,62 @@ async fn on_blob(
}
Ok(reader)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use testdir::testdir;
use crate::chat::{get_chat_msgs, send_msg, ChatItem};
use crate::message::{Message, Viewtype};
use crate::test_utils::TestContextManager;
use super::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_receive() {
let dir = testdir!();
let mut tcm = TestContextManager::new();
// Create first device.
let ctx0 = tcm.alice().await;
// Write a message in the self chat
let self_chat = ctx0.get_self_chat().await;
let mut msg = Message::new(Viewtype::Text);
msg.set_text(Some("hi there".to_string()));
send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap();
// Prepare to transfer backup.
ctx0.stop_io().await;
let provider = BackupProvider::prepare(&ctx0, &dir).await.unwrap();
// Set up second device.
let ctx1 = tcm.bob().await;
ctx1.stop_io().await;
ctx1.sql
.set_raw_config_bool("configured", false)
.await
.unwrap();
get_backup(&ctx1, provider.qr()).await.unwrap();
// Make sure the provider finishes without an error.
tokio::time::timeout(Duration::from_secs(30), provider.join())
.await
.expect("timed out")
.expect("error in provider");
// Check that we have the self message.
let self_chat = ctx1.get_self_chat().await;
let msgs = get_chat_msgs(&ctx1, self_chat.id, 0).await.unwrap();
assert_eq!(msgs.len(), 1);
let msgid = match msgs.get(0).unwrap() {
ChatItem::Message { msg_id } => msg_id,
_ => panic!("wrong chat item"),
};
let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap();
let text = msg.get_text().unwrap();
assert_eq!(text, "hi there");
}
}