diff --git a/src/accounts.rs b/src/accounts.rs index efaeb6c0b..6268c7f12 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; -use async_std::channel::{Receiver, Sender}; +use async_std::channel::{self, Receiver, Sender}; use async_std::fs; use async_std::path::PathBuf; use async_std::prelude::*; @@ -22,6 +22,13 @@ pub struct Accounts { config: Config, accounts: Arc>>, emitter: EventEmitter, + + /// Sender side of the fake event channel. + /// + /// We never send any events over this channel, but hold it during the account manager lifetime + /// to prevent `EventEmitter` from returning `None` as long as account manager is alive, even if + /// it holds no accounts which could emit events. + fake_sender: Sender, } impl Accounts { @@ -57,6 +64,11 @@ impl Accounts { let accounts = config.load_accounts().await?; let emitter = EventEmitter::new(); + + // Fake event stream to prevent event emitter from closing. + let (fake_sender, fake_receiver) = channel::bounded(1); + emitter.sender.send(fake_receiver).await?; + for account in accounts.values() { emitter.add_account(account).await?; } @@ -66,6 +78,7 @@ impl Accounts { config, accounts: Arc::new(RwLock::new(accounts)), emitter, + fake_sender, }) } @@ -263,18 +276,18 @@ impl Accounts { #[derive(Debug, Clone)] pub struct EventEmitter { /// Aggregate stream of events from all accounts. - stream: Arc>>, + stream: Arc>>>, /// Sender for the channel where new account emitters will be pushed. - sender: Sender, + sender: Sender>, /// Receiver for the channel where new account emitters will be pushed. - receiver: Receiver, + receiver: Receiver>, } impl EventEmitter { pub fn new() -> Self { - let (sender, receiver) = async_std::channel::unbounded(); + let (sender, receiver) = channel::unbounded(); Self { stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())), sender, @@ -302,7 +315,9 @@ impl EventEmitter { /// Add event emitter of a new account to the aggregate event emitter. pub async fn add_account(&self, context: &Context) -> Result<()> { - self.sender.send(context.get_event_emitter()).await?; + self.sender + .send(context.get_event_emitter().into_inner()) + .await?; Ok(()) } } @@ -702,4 +717,30 @@ mod tests { Ok(()) } + + #[async_std::test] + async fn test_no_accounts_event_emitter() -> Result<()> { + let dir = tempfile::tempdir().unwrap(); + let p: PathBuf = dir.path().join("accounts").into(); + + let accounts = Accounts::new("my_os".into(), p.clone()).await?; + + // Make sure there are no accounts. + assert_eq!(accounts.accounts.read().await.len(), 0); + + // Create event emitter. + let mut event_emitter = accounts.get_event_emitter().await; + + // Test that event emitter does not return `None` immediately. + let duration = std::time::Duration::from_millis(1); + assert!(async_std::future::timeout(duration, event_emitter.recv()) + .await + .is_err()); + + // When account manager is dropped, event emitter is exhausted. + drop(accounts); + assert_eq!(event_emitter.recv().await?, None); + + Ok(()) + } } diff --git a/src/events.rs b/src/events.rs index 1a8a1826e..390707a32 100644 --- a/src/events.rs +++ b/src/events.rs @@ -62,6 +62,10 @@ impl Events { pub struct EventEmitter(Receiver); impl EventEmitter { + pub(crate) fn into_inner(self) -> Receiver { + self.0 + } + /// Blocking recv of an event. Return `None` if the `Sender` has been droped. pub fn recv_sync(&self) -> Option { async_std::task::block_on(self.recv())