diff --git a/Cargo.lock b/Cargo.lock index a81a84270..ec870a82e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-broadcast" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258b52a1aa741b9f09783b2d86cf0aeeb617bbf847f6933340a39644227acbdb" +dependencies = [ + "event-listener 5.2.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -1149,6 +1161,7 @@ version = "1.137.3" dependencies = [ "ansi_term", "anyhow", + "async-broadcast", "async-channel 2.2.0", "async-imap", "async-native-tls", diff --git a/Cargo.toml b/Cargo.toml index 58f6cd8bb..3f95fb942 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ format-flowed = { path = "./format-flowed" } ratelimit = { path = "./deltachat-ratelimit" } anyhow = { workspace = true } +async-broadcast = "0.7.0" async-channel = "2.0.0" async-imap = { version = "0.9.7", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } diff --git a/deltachat-ffi/deltachat.h b/deltachat-ffi/deltachat.h index 22f549668..7e7af8dd3 100644 --- a/deltachat-ffi/deltachat.h +++ b/deltachat-ffi/deltachat.h @@ -362,8 +362,12 @@ uint32_t dc_get_id (dc_context_t* context); * Must be freed using dc_event_emitter_unref() after usage. * * Note: Use only one event emitter per context. - * Having more than one event emitter running at the same time on the same context - * will result in events being randomly delivered to one of the emitters. + * The result of having multiple event emitters is unspecified. + * Currently events are broadcasted to all existing event emitters, + * but previous versions delivered events to only one event emitter + * and this behavior may change again in the future. + * Events emitted before creation of event emitter + * may or may not be available to event emitter. */ dc_event_emitter_t* dc_get_event_emitter(dc_context_t* context); diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 72a80ab50..a001090ae 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4922,7 +4922,9 @@ mod jsonrpc { } let account_manager = &*account_manager; - let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone()); + let cmd_api = block_on(deltachat_jsonrpc::api::CommandApi::from_arc( + account_manager.inner.clone(), + )); let (request_handle, receiver) = RpcClient::new(); let handle = RpcSession::new(request_handle, cmd_api); diff --git a/deltachat-jsonrpc/src/api.rs b/deltachat-jsonrpc/src/api.rs index 42627a4b7..1861807d9 100644 --- a/deltachat-jsonrpc/src/api.rs +++ b/deltachat-jsonrpc/src/api.rs @@ -29,6 +29,7 @@ use deltachat::reaction::{get_msg_reactions, send_reaction}; use deltachat::securejoin; use deltachat::stock_str::StockMessage; use deltachat::webxdc::StatusUpdateSerial; +use deltachat::EventEmitter; use sanitize_filename::is_sanitized; use tokio::fs; use tokio::sync::{watch, Mutex, RwLock}; @@ -81,21 +82,30 @@ impl Default for AccountState { pub struct CommandApi { pub(crate) accounts: Arc>, + /// Receiver side of the event channel. + /// + /// Events from it can be received by calling `get_next_event` method. + event_emitter: Arc, + states: Arc>>, } impl CommandApi { pub fn new(accounts: Accounts) -> Self { + let event_emitter = Arc::new(accounts.get_event_emitter()); CommandApi { accounts: Arc::new(RwLock::new(accounts)), + event_emitter, states: Arc::new(Mutex::new(BTreeMap::new())), } } #[allow(dead_code)] - pub fn from_arc(accounts: Arc>) -> Self { + pub async fn from_arc(accounts: Arc>) -> Self { + let event_emitter = Arc::new(accounts.read().await.get_event_emitter()); CommandApi { accounts, + event_emitter, states: Arc::new(Mutex::new(BTreeMap::new())), } } @@ -158,8 +168,7 @@ impl CommandApi { /// Get the next event. async fn get_next_event(&self) -> Result { - let event_emitter = self.accounts.read().await.get_event_emitter(); - event_emitter + self.event_emitter .recv() .await .map(|event| event.into()) diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 4be58760b..0fd981e5b 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -68,7 +68,7 @@ async fn main_impl() -> Result<()> { log::info!("Creating JSON-RPC API."); let accounts = Arc::new(RwLock::new(accounts)); - let state = CommandApi::from_arc(accounts.clone()); + let state = CommandApi::from_arc(accounts.clone()).await; let (client, mut out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), state.clone()); diff --git a/src/contact.rs b/src/contact.rs index 0c12d3b71..548a9071e 100644 --- a/src/contact.rs +++ b/src/contact.rs @@ -2730,7 +2730,7 @@ Hi."#; let sent_msg = alice.send_text(chat.id, "moin").await; let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?; assert!(!contact.was_seen_recently()); - while bob.evtracker.try_recv().is_ok() {} + bob.evtracker.clear_events(); bob.recv_msg(&sent_msg).await; let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?; assert!(contact.was_seen_recently()); @@ -2742,7 +2742,7 @@ Hi."#; .await; // Wait for `was_seen_recently()` to turn off. - while bob.evtracker.try_recv().is_ok() {} + bob.evtracker.clear_events(); SystemTime::shift(Duration::from_secs(SEEN_RECENTLY_SECONDS as u64 * 2)); recently_seen_loop.interrupt(ContactId::UNDEFINED, 0).await; let contact = Contact::get_by_id(&bob, *contacts.first().unwrap()).await?; diff --git a/src/events.rs b/src/events.rs index dcbee99d3..40c24ebab 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,7 @@ //! # Events specification. -use async_channel::{self as channel, Receiver, Sender, TrySendError}; +use anyhow::Result; +use tokio::sync::Mutex; pub(crate) mod chatlist_events; mod payload; @@ -10,8 +11,11 @@ pub use self::payload::EventType; /// Event channel. #[derive(Debug, Clone)] pub struct Events { - receiver: Receiver, - sender: Sender, + /// Unused receiver to prevent the channel from closing. + _receiver: async_broadcast::InactiveReceiver, + + /// Sender side of the event channel. + sender: async_broadcast::Sender, } impl Default for Events { @@ -23,33 +27,30 @@ impl Default for Events { impl Events { /// Creates a new event channel. pub fn new() -> Self { - let (sender, receiver) = channel::bounded(1_000); + let (mut sender, _receiver) = async_broadcast::broadcast(1_000); - Self { receiver, sender } + // We only keep this receiver around + // to prevent the channel from closing. + // Deactivating it to prevent it from consuming memory + // holding events that are not going to be received. + let _receiver = _receiver.deactivate(); + + // Remove oldest event on overflow. + sender.set_overflow(true); + + Self { _receiver, sender } } /// Emits an event into event channel. /// /// If the channel is full, deletes the oldest event first. pub fn emit(&self, event: Event) { - match self.sender.try_send(event) { - Ok(()) => {} - Err(TrySendError::Full(event)) => { - // when we are full, we pop remove the oldest event and push on the new one - let _ = self.receiver.try_recv(); - - // try again - self.emit(event); - } - Err(TrySendError::Closed(_)) => { - unreachable!("unable to emit event, channel disconnected"); - } - } + self.sender.try_broadcast(event).ok(); } /// Creates an event emitter. pub fn get_emitter(&self) -> EventEmitter { - EventEmitter(self.receiver.clone()) + EventEmitter(Mutex::new(self.sender.new_receiver())) } } @@ -61,13 +62,32 @@ impl Events { /// /// [`Context`]: crate::context::Context /// [`Context::get_event_emitter`]: crate::context::Context::get_event_emitter -#[derive(Debug, Clone)] -pub struct EventEmitter(Receiver); +#[derive(Debug)] +pub struct EventEmitter(Mutex>); impl EventEmitter { /// Async recv of an event. Return `None` if the `Sender` has been dropped. + /// + /// [`try_recv`]: Self::try_recv pub async fn recv(&self) -> Option { - self.0.recv().await.ok() + let mut lock = self.0.lock().await; + lock.recv().await.ok() + } + + /// Tries to receive an event without blocking. + /// + /// Returns error if no events are available for reception + /// or if receiver mutex is locked by a concurrent call to [`recv`] + /// or `try_recv`. + /// + /// [`recv`]: Self::recv + pub fn try_recv(&self) -> Result { + // Using `try_lock` instead of `lock` + // to avoid blocking + // in case there is a concurrent call to `recv`. + let mut lock = self.0.try_lock()?; + let event = lock.try_recv()?; + Ok(event) } } diff --git a/src/events/chatlist_events.rs b/src/events/chatlist_events.rs index c679e0686..4b612f3ee 100644 --- a/src/events/chatlist_events.rs +++ b/src/events/chatlist_events.rs @@ -181,7 +181,7 @@ mod test_chatlist_events { .await?; set_muted(&bob, bob_chat.id, MuteDuration::Forever).await?; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); let sent_msg = alice.send_text(chat.id, "moin2").await; bob.recv_msg(&sent_msg).await; @@ -216,7 +216,7 @@ mod test_chatlist_events { let sent_msg = alice.send_text(chat.id, "moin2").await; bob.recv_msg(&sent_msg).await; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); chat::marknoticed_chat(&bob, DC_CHAT_ID_ARCHIVED_LINK).await?; wait_for_chatlist_specific_item(&bob, DC_CHAT_ID_ARCHIVED_LINK).await; @@ -233,7 +233,7 @@ mod test_chatlist_events { let sent_msg = alice.send_text(alice_to_bob_chat.id, "hello").await; bob.recv_msg(&sent_msg).await; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); // set alice name then receive messagefrom her with bob alice.set_config(Config::Displayname, Some("Alice")).await?; let sent_msg = alice @@ -245,7 +245,7 @@ mod test_chatlist_events { wait_for_chatlist_all_items(&bob).await; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); // set name let addr = alice_on_bob.get_addr(); Contact::create(&bob, "Alice2", addr).await?; @@ -266,7 +266,7 @@ mod test_chatlist_events { let sent_msg = alice.send_text(alice_to_bob_chat.id, "hello").await; bob.recv_msg(&sent_msg).await; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); // set alice avatar then receive messagefrom her with bob let file = alice.dir.path().join("avatar.png"); let bytes = include_bytes!("../../test-data/image/avatar64x64.png"); @@ -292,7 +292,7 @@ mod test_chatlist_events { let alice = tcm.alice().await; let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); chat.delete(&alice).await?; wait_for_chatlist(&alice).await; Ok(()) @@ -303,7 +303,7 @@ mod test_chatlist_events { async fn test_create_group_chat() -> Result<()> { let mut tcm = TestContextManager::new(); let alice = tcm.alice().await; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?; wait_for_chatlist_and_specific_item(&alice, chat).await; Ok(()) @@ -314,7 +314,7 @@ mod test_chatlist_events { async fn test_create_broadcastlist() -> Result<()> { let mut tcm = TestContextManager::new(); let alice = tcm.alice().await; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); create_broadcast_list(&alice).await?; wait_for_chatlist(&alice).await; Ok(()) @@ -327,11 +327,11 @@ mod test_chatlist_events { let alice = tcm.alice().await; let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); chat::set_muted(&alice, chat, MuteDuration::Forever).await?; wait_for_chatlist_specific_item(&alice, chat).await; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); chat::set_muted(&alice, chat, MuteDuration::NotMuted).await?; wait_for_chatlist_specific_item(&alice, chat).await; @@ -352,7 +352,7 @@ mod test_chatlist_events { .unwrap(), ); chat::set_muted(&alice, chat, mute_duration).await?; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); SystemTime::shift(Duration::from_secs(3)); wait_for_chatlist_specific_item(&alice, chat).await; @@ -366,7 +366,7 @@ mod test_chatlist_events { let alice = tcm.alice().await; let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); chat::set_chat_name(&alice, chat, "New Name").await?; wait_for_chatlist_specific_item(&alice, chat).await; @@ -380,7 +380,7 @@ mod test_chatlist_events { let alice = tcm.alice().await; let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); let file = alice.dir.path().join("avatar.png"); let bytes = include_bytes!("../../test-data/image/avatar64x64.png"); tokio::fs::write(&file, bytes).await?; @@ -405,7 +405,7 @@ mod test_chatlist_events { wait_for_chatlist_specific_item(&bob, chat_id_for_bob).await; chat_id_for_bob.accept(&bob).await?; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); chat::set_chat_name(&alice, chat, "New Name").await?; let sent_msg = alice.send_text(chat, "Hello").await; bob.recv_msg(&sent_msg).await; @@ -426,7 +426,7 @@ mod test_chatlist_events { let sent_msg = alice.send_text(chat, "Hello").await; let chat_id_for_bob = bob.recv_msg(&sent_msg).await.chat_id; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); chat_id_for_bob.accept(&bob).await?; wait_for_chatlist_specific_item(&bob, chat_id_for_bob).await; @@ -445,7 +445,7 @@ mod test_chatlist_events { let sent_msg = alice.send_text(chat, "Hello").await; let chat_id_for_bob = bob.recv_msg(&sent_msg).await.chat_id; - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); chat_id_for_bob.block(&bob).await?; wait_for_chatlist(&bob).await; @@ -460,7 +460,7 @@ mod test_chatlist_events { let chat = create_group_chat(&alice, ProtectionStatus::Protected, "My Group").await?; let message = chat::send_text_msg(&alice, chat, "Hello World".to_owned()).await?; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); message::delete_msgs(&alice, &[message]).await?; wait_for_chatlist_specific_item(&alice, chat).await; @@ -485,7 +485,7 @@ mod test_chatlist_events { let chat_id_for_bob = bob.recv_msg(&sent_msg).await.chat_id; assert!(chat_id_for_bob.get_fresh_msg_cnt(&bob).await? >= 1); - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); chat::marknoticed_chat(&bob, chat_id_for_bob).await?; wait_for_chatlist_specific_item(&bob, chat_id_for_bob).await; @@ -500,11 +500,11 @@ mod test_chatlist_events { let contact_id = Contact::create(&alice, "example", "example@example.com").await?; let _ = ChatId::create_for_contact(&alice, contact_id).await; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); Contact::block(&alice, contact_id).await?; wait_for_chatlist(&alice).await; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); Contact::unblock(&alice, contact_id).await?; wait_for_chatlist(&alice).await; @@ -547,7 +547,7 @@ Content-Type: text/plain; charset=utf-8; format=flowed; delsp=no First thread."#; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); receive_imf(&alice, mime, false).await?; wait_for_chatlist(&alice).await; @@ -568,34 +568,34 @@ First thread."#; let qr = get_securejoin_qr(&alice.ctx, Some(alice_chatid)).await?; // Step 2: Bob scans QR-code, sends vg-request - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); let bob_chatid = join_securejoin(&bob.ctx, &qr).await?; wait_for_chatlist(&bob).await; let sent = bob.pop_sent_msg().await; // Step 3: Alice receives vg-request, sends vg-auth-required - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); alice.recv_msg_trash(&sent).await; let sent = alice.pop_sent_msg().await; // Step 4: Bob receives vg-auth-required, sends vg-request-with-auth - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); bob.recv_msg_trash(&sent).await; wait_for_chatlist_and_specific_item(&bob, bob_chatid).await; let sent = bob.pop_sent_msg().await; // Step 5+6: Alice receives vg-request-with-auth, sends vg-member-added - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); alice.recv_msg_trash(&sent).await; wait_for_chatlist_and_specific_item(&alice, alice_chatid).await; let sent = alice.pop_sent_msg().await; // Step 7: Bob receives vg-member-added - bob.evtracker.clear_events().await; + bob.evtracker.clear_events(); bob.recv_msg(&sent).await; wait_for_chatlist_and_specific_item(&bob, bob_chatid).await; @@ -617,7 +617,7 @@ First thread."#; let message = Message::load_from_db(&alice, msg_id).await?; assert_eq!(message.get_state(), MessageState::OutDelivered); - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); chat::resend_msgs(&alice, &[msg_id]).await?; wait_for_chatlist_specific_item(&alice, chat).await; @@ -633,7 +633,7 @@ First thread."#; let msg_id = chat::send_text_msg(&alice, chat, "Hello".to_owned()).await?; let _ = alice.pop_sent_msg().await; - alice.evtracker.clear_events().await; + alice.evtracker.clear_events(); reaction::send_reaction(&alice, msg_id, "👍").await?; let _ = alice.pop_sent_msg().await; wait_for_chatlist_specific_item(&alice, chat).await; diff --git a/src/sql.rs b/src/sql.rs index 1a43acf8e..f971bbcff 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1007,8 +1007,6 @@ pub fn repeat_vars(count: usize) -> String { #[cfg(test)] mod tests { - use async_channel as channel; - use super::*; use crate::{test_utils::TestContext, EventType}; @@ -1085,8 +1083,7 @@ mod tests { .await .unwrap(); - let (event_sink, event_source) = channel::unbounded(); - t.add_event_sender(event_sink).await; + let event_source = t.get_event_emitter(); let a = t.get_config(Config::Selfavatar).await.unwrap().unwrap(); assert_eq!(avatar_bytes, &tokio::fs::read(&a).await.unwrap()[..]); diff --git a/src/test_utils.rs b/src/test_utils.rs index fe72cf34d..f1b0bbc60 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -19,7 +19,6 @@ use pretty_assertions::assert_eq; use rand::Rng; use tempfile::{tempdir, TempDir}; use tokio::runtime::Handle; -use tokio::sync::RwLock; use tokio::{fs, task}; use crate::chat::{ @@ -34,7 +33,7 @@ use crate::constants::{Blocked, Chattype}; use crate::contact::{Contact, ContactId, Modifier, Origin}; use crate::context::Context; use crate::e2ee::EncryptHelper; -use crate::events::{Event, EventType, Events}; +use crate::events::{Event, EventEmitter, EventType, Events}; use crate::key::{self, DcKey, KeyPairUse}; use crate::message::{update_msg_state, Message, MessageState, MsgId, Viewtype}; use crate::mimeparser::{MimeMessage, SystemMessage}; @@ -57,20 +56,19 @@ static CONTEXT_NAMES: Lazy>> = /// occurred rather than grouped by context like would happen when you use separate /// [`TestContext`]s without managing your own [`LogSink`]. pub struct TestContextManager { - log_tx: Sender, - _log_sink: LogSink, + log_sink: LogSink, } impl TestContextManager { pub fn new() -> Self { - let (log_tx, _log_sink) = LogSink::create(); - Self { log_tx, _log_sink } + let log_sink = LogSink::new(); + Self { log_sink } } pub async fn alice(&mut self) -> TestContext { TestContext::builder() .configure_alice() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -78,7 +76,7 @@ impl TestContextManager { pub async fn bob(&mut self) -> TestContext { TestContext::builder() .configure_bob() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -86,7 +84,7 @@ impl TestContextManager { pub async fn fiona(&mut self) -> TestContext { TestContext::builder() .configure_fiona() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -94,7 +92,7 @@ impl TestContextManager { /// Creates a new unconfigured test account. pub async fn unconfigured(&mut self) -> TestContext { TestContext::builder() - .with_log_sink(self.log_tx.clone()) + .with_log_sink(self.log_sink.clone()) .build() .await } @@ -103,7 +101,8 @@ impl TestContextManager { /// /// ========== `msg` goes here ========== pub fn section(&self, msg: &str) { - self.log_tx + self.log_sink + .sender .try_send(LogEvent::Section(msg.to_string())) .expect( "The events channel should be unbounded and not closed, so try_send() shouldn't fail", @@ -194,7 +193,7 @@ impl TestContextManager { #[derive(Debug, Clone, Default)] pub struct TestContextBuilder { key_pair: Option, - log_sink: Option>, + log_sink: LogSink, } impl TestContextBuilder { @@ -234,8 +233,8 @@ impl TestContextBuilder { /// using a single [`LogSink`] for both contexts. This shows the log messages in /// sequence as they occurred rather than all messages from each context in a single /// block. - pub fn with_log_sink(mut self, sink: Sender) -> Self { - self.log_sink = Some(sink); + pub fn with_log_sink(mut self, sink: LogSink) -> Self { + self.log_sink = sink; self } @@ -243,7 +242,7 @@ impl TestContextBuilder { pub async fn build(self) -> TestContext { let name = self.key_pair.as_ref().map(|key| key.addr.local.clone()); - let test_context = TestContext::new_internal(name, self.log_sink).await; + let test_context = TestContext::new_internal(name, Some(self.log_sink.clone())).await; if let Some(key_pair) = self.key_pair { test_context @@ -266,18 +265,16 @@ pub struct TestContext { pub dir: TempDir, pub evtracker: EventTracker, - /// Channels which should receive events from this context. - event_senders: Arc>>>, + /// Reference to implicit [`LogSink`] so it is dropped together with the context. /// /// Only used if no explicit `log_sender` is passed into [`TestContext::new_internal`] /// (which is assumed to be the sending end of a [`LogSink`]). /// /// This is a convenience in case only a single [`TestContext`] is used to avoid dealing - /// with [`LogSink`]. Never read, thus "dead code", since the only purpose is to + /// with [`LogSink`]. Never read, since the only purpose is to /// control when Drop is invoked. - #[allow(dead_code)] - log_sink: Option, + _log_sink: Option, } impl TestContext { @@ -337,7 +334,7 @@ impl TestContext { /// `log_sender` is assumed to be the sender for a [`LogSink`]. If not supplied a new /// [`LogSink`] will be created so that events are logged to this test when the /// [`TestContext`] is dropped. - async fn new_internal(name: Option, log_sender: Option>) -> Self { + async fn new_internal(name: Option, log_sink: Option) -> Self { let dir = tempdir().unwrap(); let dbfile = dir.path().join("db.sqlite"); let id = rand::thread_rng().gen(); @@ -345,35 +342,23 @@ impl TestContext { let mut context_names = CONTEXT_NAMES.write().unwrap(); context_names.insert(id, name); } - let ctx = Context::new(&dbfile, id, Events::new(), StockStrings::new()) + let events = Events::new(); + let evtracker_receiver = events.get_emitter(); + let ctx = Context::new(&dbfile, id, events, StockStrings::new()) .await .expect("failed to create context"); - let events = ctx.get_event_emitter(); - - let (log_sender, log_sink) = match log_sender { - Some(sender) => (sender, None), - None => { - let (sender, sink) = LogSink::create(); - (sender, Some(sink)) - } + let _log_sink = if let Some(log_sink) = log_sink { + // Subscribe existing LogSink and don't store reference to it. + log_sink.subscribe(ctx.get_event_emitter()); + None + } else { + // Create new LogSink and store it inside the `TestContext`. + let log_sink = LogSink::new(); + log_sink.subscribe(ctx.get_event_emitter()); + Some(log_sink) }; - let (evtracker_sender, evtracker_receiver) = channel::unbounded(); - let event_senders = Arc::new(RwLock::new(vec![evtracker_sender])); - let senders = Arc::clone(&event_senders); - - task::spawn(async move { - while let Some(event) = events.recv().await { - for sender in senders.read().await.iter() { - // Don't block because someone wanted to use a oneshot receiver, use - // an unbounded channel if you want all events. - sender.try_send(event.clone()).ok(); - } - log_sender.try_send(LogEvent::Event(event.clone())).ok(); - } - }); - ctx.set_config(Config::SkipStartMessages, Some("1")) .await .unwrap(); @@ -383,8 +368,7 @@ impl TestContext { ctx, dir, evtracker: EventTracker(evtracker_receiver), - event_senders, - log_sink, + _log_sink, } } @@ -407,14 +391,6 @@ impl TestContext { context_names.get(id).unwrap_or(&id.to_string()).to_string() } - /// Adds a new [`Event`]s sender. - /// - /// Once added, all events emitted by this context will be sent to this channel. This - /// is useful if you need to wait for events or make assertions on them. - pub async fn add_event_sender(&self, sink: Sender) { - self.event_senders.write().await.push(sink) - } - /// Configure as a given email address. /// /// The context will be configured but the key will not be pre-generated so if a key is @@ -849,22 +825,62 @@ pub enum LogEvent { /// This sink achieves this by printing the events, in the order received, at the time it is /// dropped. Thus to use you must only make sure this sink is dropped in the test itself. /// -/// To use this create an instance using [`LogSink::create`] and then use the -/// [`TestContextBuilder::with_log_sink`]. -#[derive(Debug)] -pub struct LogSink { - events: Receiver, -} +/// To use this create an instance using [`LogSink::new`] and then use the +/// [`TestContextBuilder::with_log_sink`] or use [`TestContextManager`]. +#[derive(Debug, Clone, Default)] +pub struct LogSink(Arc); impl LogSink { /// Creates a new [`LogSink`] and returns the attached event sink. - pub fn create() -> (Sender, Self) { - let (tx, rx) = channel::unbounded(); - (tx, Self { events: rx }) + pub fn new() -> Self { + Default::default() } } -impl Drop for LogSink { +impl Deref for LogSink { + type Target = InnerLogSink; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Debug)] +pub struct InnerLogSink { + events: Receiver, + + /// Sender side of the log receiver. + /// + /// It is cloned when log sink is subscribed + /// to new event emitter + /// and can be used directly from the test to + /// add "sections" to the log. + sender: Sender, +} + +impl Default for InnerLogSink { + fn default() -> Self { + let (tx, rx) = channel::unbounded(); + Self { + events: rx, + sender: tx, + } + } +} + +impl InnerLogSink { + /// Subscribes this log sink to event emitter. + pub fn subscribe(&self, event_emitter: EventEmitter) { + let sender = self.sender.clone(); + task::spawn(async move { + while let Some(event) = event_emitter.recv().await { + sender.try_send(LogEvent::Event(event.clone())).ok(); + } + }); + } +} + +impl Drop for InnerLogSink { fn drop(&mut self) { while let Ok(event) = self.events.try_recv() { print_logevent(&event); @@ -975,10 +991,10 @@ pub fn fiona_keypair() -> KeyPair { /// be attached to a single [`TestContext`] and therefore the context is already known as /// you will be accessing it as [`TestContext::evtracker`]. #[derive(Debug)] -pub struct EventTracker(Receiver); +pub struct EventTracker(EventEmitter); impl Deref for EventTracker { - type Target = Receiver; + type Target = EventEmitter; fn deref(&self) -> &Self::Target { &self.0 @@ -1025,15 +1041,8 @@ impl EventTracker { } /// Clears event queue. - /// - /// This spends 1 second instead of using `try_recv` - /// to avoid accidentally leaving an event that - /// was emitted right before calling `clear_events()`. - /// - /// Avoid using this function if you can - /// by waiting for specific events you expect to receive. - pub async fn clear_events(&self) { - while let Ok(_ev) = tokio::time::timeout(Duration::from_secs(1), self.recv()).await {} + pub fn clear_events(&self) { + while let Ok(_ev) = self.try_recv() {} } }