Remove EventSink from TestContext to simplify it

This replaces the EventSink callbacks with simple channel senders.
This simplifies the TestContext a lot as that is much simpler to
handle.  It then also removes the special-casing of the LogSink since
it now is another even sender, only injected at the very start.
This commit is contained in:
Floris Bruynooghe
2021-12-12 21:31:25 +01:00
parent f0ca50ba27
commit 147f5c1e0d
4 changed files with 117 additions and 142 deletions

View File

@@ -417,7 +417,10 @@ mod tests {
#[async_std::test] #[async_std::test]
async fn test_prexisting() { async fn test_prexisting() {
let t = TestContext::new_alice().await; let t = TestContext::new_alice().await;
assert_eq!(ensure_secret_key_exists(&t).await.unwrap(), "alice@example.com"); assert_eq!(
ensure_secret_key_exists(&t).await.unwrap(),
"alice@example.org"
);
} }
#[async_std::test] #[async_std::test]

View File

@@ -942,7 +942,6 @@ mod tests {
use crate::chat::ProtectionStatus; use crate::chat::ProtectionStatus;
use crate::chatlist::Chatlist; use crate::chatlist::Chatlist;
use crate::constants::Chattype; use crate::constants::Chattype;
use crate::events::Event;
use crate::peerstate::Peerstate; use crate::peerstate::Peerstate;
use crate::test_utils::TestContext; use crate::test_utils::TestContext;
use std::time::Duration; use std::time::Duration;
@@ -955,16 +954,8 @@ mod tests {
assert_eq!(Chatlist::try_load(&bob, 0, None, None).await?.len(), 0); assert_eq!(Chatlist::try_load(&bob, 0, None, None).await?.len(), 0);
// Setup JoinerProgress sinks. // Setup JoinerProgress sinks.
let (joiner_progress_tx, joiner_progress_rx) = async_std::channel::bounded(100); let (joiner_progress_tx, joiner_progress_rx) = async_std::channel::unbounded();
bob.add_event_sink(move |event: Event| { bob.add_event_sender(joiner_progress_tx).await;
let joiner_progress_tx = joiner_progress_tx.clone();
async move {
if let EventType::SecurejoinJoinerProgress { .. } = event.typ {
joiner_progress_tx.try_send(event).unwrap();
}
}
})
.await;
// Step 1: Generate QR-code, ChatId(0) indicates setup-contact // Step 1: Generate QR-code, ChatId(0) indicates setup-contact
let qr = dc_get_securejoin_qr(&alice.ctx, None).await?; let qr = dc_get_securejoin_qr(&alice.ctx, None).await?;
@@ -997,29 +988,33 @@ mod tests {
bob.recv_msg(&sent).await; bob.recv_msg(&sent).await;
// Check Bob emitted the JoinerProgress event. // Check Bob emitted the JoinerProgress event.
{ async {
let evt = joiner_progress_rx loop {
.recv() let event = joiner_progress_rx.recv().await.unwrap();
.timeout(Duration::from_secs(10)) match event.typ {
.await
.expect("timeout waiting for JoinerProgress event")
.expect("missing JoinerProgress event");
match evt.typ {
EventType::SecurejoinJoinerProgress { EventType::SecurejoinJoinerProgress {
contact_id, contact_id,
progress, progress,
} => { } => {
let alice_contact_id = let alice_contact_id = Contact::lookup_id_by_addr(
Contact::lookup_id_by_addr(&bob.ctx, "alice@example.org", Origin::Unknown) &bob.ctx,
"alice@example.org",
Origin::Unknown,
)
.await .await
.expect("Error looking up contact") .expect("Error looking up contact")
.expect("Contact not found"); .expect("Contact not found");
assert_eq!(contact_id, alice_contact_id); assert_eq!(contact_id, alice_contact_id);
assert_eq!(progress, 400); assert_eq!(progress, 400);
break;
} }
_ => panic!("Wrong event type"), _ => {}
} }
} }
}
.timeout(Duration::from_secs(10))
.await
.expect("timeout waiting for JoinerProgress event");
// Check Bob sent the right message. // Check Bob sent the right message.
let sent = bob.pop_sent_msg().await; let sent = bob.pop_sent_msg().await;
@@ -1157,16 +1152,8 @@ mod tests {
let bob = TestContext::new_bob().await; let bob = TestContext::new_bob().await;
// Setup JoinerProgress sinks. // Setup JoinerProgress sinks.
let (joiner_progress_tx, joiner_progress_rx) = async_std::channel::bounded(100); let (joiner_progress_tx, joiner_progress_rx) = async_std::channel::unbounded();
bob.add_event_sink(move |event: Event| { bob.add_event_sender(joiner_progress_tx).await;
let joiner_progress_tx = joiner_progress_tx.clone();
async move {
if let EventType::SecurejoinJoinerProgress { .. } = event.typ {
joiner_progress_tx.try_send(event).unwrap();
}
}
})
.await;
// Ensure Bob knows Alice_FP // Ensure Bob knows Alice_FP
let alice_pubkey = SignedPublicKey::load_self(&alice.ctx).await?; let alice_pubkey = SignedPublicKey::load_self(&alice.ctx).await?;
@@ -1194,29 +1181,33 @@ mod tests {
dc_join_securejoin(&bob.ctx, &qr).await.unwrap(); dc_join_securejoin(&bob.ctx, &qr).await.unwrap();
// Check Bob emitted the JoinerProgress event. // Check Bob emitted the JoinerProgress event.
{ async {
let evt = joiner_progress_rx loop {
.recv() let event = joiner_progress_rx.recv().await.unwrap();
.timeout(Duration::from_secs(10)) match event.typ {
.await
.expect("timeout waiting for JoinerProgress event")
.expect("missing JoinerProgress event");
match evt.typ {
EventType::SecurejoinJoinerProgress { EventType::SecurejoinJoinerProgress {
contact_id, contact_id,
progress, progress,
} => { } => {
let alice_contact_id = let alice_contact_id = Contact::lookup_id_by_addr(
Contact::lookup_id_by_addr(&bob.ctx, "alice@example.org", Origin::Unknown) &bob.ctx,
"alice@example.org",
Origin::Unknown,
)
.await .await
.expect("Error looking up contact") .expect("Error looking up contact")
.expect("Contact not found"); .expect("Contact not found");
assert_eq!(contact_id, alice_contact_id); assert_eq!(contact_id, alice_contact_id);
assert_eq!(progress, 400); assert_eq!(progress, 400);
break;
} }
_ => panic!("Wrong event type"), _ => {}
} }
} }
}
.timeout(Duration::from_secs(10))
.await
.expect("timeout waiting for JoinerProgress event");
assert!(!bob.ctx.has_ongoing().await); assert!(!bob.ctx.has_ongoing().await);
// Check Bob sent the right handshake message. // Check Bob sent the right handshake message.
@@ -1330,16 +1321,8 @@ mod tests {
assert_eq!(Chatlist::try_load(&bob, 0, None, None).await?.len(), 0); assert_eq!(Chatlist::try_load(&bob, 0, None, None).await?.len(), 0);
// Setup JoinerProgress sinks. // Setup JoinerProgress sinks.
let (joiner_progress_tx, joiner_progress_rx) = async_std::channel::bounded(100); let (joiner_progress_tx, joiner_progress_rx) = async_std::channel::unbounded();
bob.add_event_sink(move |event: Event| { bob.add_event_sender(joiner_progress_tx).await;
let joiner_progress_tx = joiner_progress_tx.clone();
async move {
if let EventType::SecurejoinJoinerProgress { .. } = event.typ {
joiner_progress_tx.try_send(event).unwrap();
}
}
})
.await;
let chatid = let chatid =
chat::create_group_chat(&alice.ctx, ProtectionStatus::Protected, "the chat").await?; chat::create_group_chat(&alice.ctx, ProtectionStatus::Protected, "the chat").await?;
@@ -1376,29 +1359,33 @@ mod tests {
let sent = bob.pop_sent_msg().await; let sent = bob.pop_sent_msg().await;
// Check Bob emitted the JoinerProgress event. // Check Bob emitted the JoinerProgress event.
{ async {
let evt = joiner_progress_rx loop {
.recv() let event = joiner_progress_rx.recv().await.unwrap();
.timeout(Duration::from_secs(10)) match event.typ {
.await
.expect("timeout waiting for JoinerProgress event")
.expect("missing JoinerProgress event");
match evt.typ {
EventType::SecurejoinJoinerProgress { EventType::SecurejoinJoinerProgress {
contact_id, contact_id,
progress, progress,
} => { } => {
let alice_contact_id = let alice_contact_id = Contact::lookup_id_by_addr(
Contact::lookup_id_by_addr(&bob.ctx, "alice@example.org", Origin::Unknown) &bob.ctx,
"alice@example.org",
Origin::Unknown,
)
.await .await
.expect("Error looking up contact") .expect("Error looking up contact")
.expect("Contact not found"); .expect("Contact not found");
assert_eq!(contact_id, alice_contact_id); assert_eq!(contact_id, alice_contact_id);
assert_eq!(progress, 400); assert_eq!(progress, 400);
break;
} }
_ => panic!("Wrong event type"), _ => {}
} }
} }
}
.timeout(Duration::from_secs(10))
.await
.expect("timeout waiting for JoinerProgress event");
// Check Bob sent the right handshake message. // Check Bob sent the right handshake message.
let msg = alice.parse_msg(&sent).await; let msg = alice.parse_msg(&sent).await;

View File

@@ -678,11 +678,12 @@ async fn prune_tombstones(sql: &Sql) -> Result<()> {
} }
#[cfg(test)] #[cfg(test)]
mod test { mod tests {
use async_std::channel;
use async_std::fs::File; use async_std::fs::File;
use crate::config::Config; use crate::config::Config;
use crate::{test_utils::TestContext, Event, EventType}; use crate::{test_utils::TestContext, EventType};
use super::*; use super::*;
@@ -743,18 +744,8 @@ mod test {
.await .await
.unwrap(); .unwrap();
t.add_event_sink(move |event: Event| async move { let (event_sink, event_source) = channel::unbounded();
match event.typ { t.add_event_sender(event_sink).await;
EventType::Info(s) => assert!(
!s.contains("Keeping new unreferenced file"),
"File {} was almost deleted, only reason it was kept is that it was created recently (as the tests don't run for a long time)",
s
),
EventType::Error(s) => panic!("{}", s),
_ => {}
}
})
.await;
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap(); let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]); assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]);
@@ -765,6 +756,18 @@ mod test {
let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap(); let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap();
assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]); assert_eq!(avatar_bytes, &async_std::fs::read(&a).await.unwrap()[..]);
while let Ok(event) = event_source.try_recv() {
match event.typ {
EventType::Info(s) => assert!(
!s.contains("Keeping new unreferenced file"),
"File {} was almost deleted, only reason it was kept is that it was created recently (as the tests don't run for a long time)",
s
),
EventType::Error(s) => panic!("{}", s),
_ => {}
}
}
} }
/// Regression test. /// Regression test.

View File

@@ -12,9 +12,7 @@ use std::time::{Duration, Instant};
use ansi_term::Color; use ansi_term::Color;
use async_std::channel::{self, Receiver, Sender}; use async_std::channel::{self, Receiver, Sender};
use async_std::future::Future;
use async_std::path::PathBuf; use async_std::path::PathBuf;
use async_std::pin::Pin;
use async_std::sync::{Arc, RwLock}; use async_std::sync::{Arc, RwLock};
use async_std::task; use async_std::task;
use chat::ChatItem; use chat::ChatItem;
@@ -41,9 +39,6 @@ use crate::param::{Param, Params};
#[allow(non_upper_case_globals)] #[allow(non_upper_case_globals)]
pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png"); pub const AVATAR_900x900_BYTES: &[u8] = include_bytes!("../test-data/image/avatar900x900.png");
type EventSink =
dyn Fn(Event) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + Sync + 'static;
/// Map of [`Context::id`] to names for [`TestContext`]s. /// Map of [`Context::id`] to names for [`TestContext`]s.
static CONTEXT_NAMES: Lazy<std::sync::RwLock<BTreeMap<u32, String>>> = static CONTEXT_NAMES: Lazy<std::sync::RwLock<BTreeMap<u32, String>>> =
Lazy::new(|| std::sync::RwLock::new(BTreeMap::new())); Lazy::new(|| std::sync::RwLock::new(BTreeMap::new()));
@@ -55,7 +50,7 @@ pub struct TestContextBuilder {
} }
impl TestContextBuilder { impl TestContextBuilder {
/// Configures as alice@example.com with fixed secret key. /// Configures as alice@example.org with fixed secret key.
/// ///
/// This is a shortcut for `.with_key_pair(alice_keypair()). /// This is a shortcut for `.with_key_pair(alice_keypair()).
pub fn configure_alice(self) -> Self { pub fn configure_alice(self) -> Self {
@@ -115,8 +110,8 @@ pub struct TestContext {
pub ctx: Context, pub ctx: Context,
pub dir: TempDir, pub dir: TempDir,
pub evtracker: EvTracker, pub evtracker: EvTracker,
/// Functions to call for events received. /// Channels which should receive events from this context.
event_sinks: Arc<RwLock<Vec<Box<EventSink>>>>, event_senders: Arc<RwLock<Vec<Sender<Event>>>>,
/// Receives panics from sinks ("sink" means "event handler" here) /// Receives panics from sinks ("sink" means "event handler" here)
poison_receiver: Receiver<String>, poison_receiver: Receiver<String>,
/// Reference to implicit [`LogSink`] so it is dropped together with the context. /// Reference to implicit [`LogSink`] so it is dropped together with the context.
@@ -196,16 +191,15 @@ impl TestContext {
let events = ctx.get_event_emitter(); let events = ctx.get_event_emitter();
let (log_sender, log_sink) = match log_sender { let (log_sender, log_sink) = match log_sender {
Some(sender) => (Arc::new(RwLock::new(sender)), None), Some(sender) => (sender, None),
None => { None => {
let (sender, sink) = LogSink::create(); let (sender, sink) = LogSink::create();
(Arc::new(RwLock::new(sender)), Some(sink)) (sender, Some(sink))
} }
}; };
let log_sender_clone = Arc::clone(&log_sender);
let event_sinks: Arc<RwLock<Vec<Box<EventSink>>>> = Arc::new(RwLock::new(Vec::new())); let event_senders = Arc::new(RwLock::new(vec![log_sender]));
let sinks = Arc::clone(&event_sinks); let senders = Arc::clone(&event_senders);
let (poison_sender, poison_receiver) = channel::bounded(1); let (poison_sender, poison_receiver) = channel::bounded(1);
let (evtracker_sender, evtracker_receiver) = channel::unbounded(); let (evtracker_sender, evtracker_receiver) = channel::unbounded();
@@ -225,17 +219,13 @@ impl TestContext {
while let Some(event) = events.recv().await { while let Some(event) = events.recv().await {
{ {
let sinks = sinks.read().await; let sinks = senders.read().await;
for sink in sinks.iter() { for sender in sinks.iter() {
sink(event.clone()).await; // Best effort, don't block because someone wanted to use a oneshot
// receiver.
sender.try_send(event.clone()).ok();
} }
} }
log_sender_clone
.read()
.await
.send(event.clone())
.await
.expect("log sender can not block");
evtracker_sender.send(event.typ).await.ok(); evtracker_sender.send(event.typ).await.ok();
} }
}); });
@@ -244,7 +234,7 @@ impl TestContext {
ctx, ctx,
dir, dir,
evtracker: EvTracker(evtracker_receiver), evtracker: EvTracker(evtracker_receiver),
event_sinks, event_senders,
poison_receiver, poison_receiver,
log_sink, log_sink,
} }
@@ -260,22 +250,14 @@ impl TestContext {
.or_insert_with(|| name.into()); .or_insert_with(|| name.into());
} }
/// Add a new callback which will receive events. /// Adds a new [`Event`]s sender.
/// ///
/// The test context runs an async task receiving all events from the [`Context`], which /// Once added, all events emitted by this context will be sent to this channel. This
/// are logged to stdout. This allows you to register additional callbacks which will /// is useful if you need to wait for events or make assertions on them.
/// receive all events in case your tests need to watch for a specific event. pub async fn add_event_sender(&self, sink: Sender<Event>) {
pub async fn add_event_sink<F, R>(&self, sink: F) self.event_senders.write().await.push(sink)
where
// Aka `F: EventSink` but type aliases are not allowed.
F: Fn(Event) -> R + Send + Sync + 'static,
R: Future<Output = ()> + Send + 'static,
{
let mut sinks = self.event_sinks.write().await;
sinks.push(Box::new(move |evt| Box::pin(sink(evt))));
} }
/// Configure as a given email address. /// Configure as a given email address.
/// ///
/// The context will be configured but the key will not be pre-generated so if a key is /// The context will be configured but the key will not be pre-generated so if a key is