accounts: keep event emitter from closing when there are no accounts (#2636)

This commit is contained in:
link2xt
2021-08-29 18:43:58 +03:00
committed by GitHub
parent 7be0583628
commit 6a60ae2f09
2 changed files with 51 additions and 6 deletions

View File

@@ -2,7 +2,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use async_std::channel::{Receiver, Sender}; use async_std::channel::{self, Receiver, Sender};
use async_std::fs; use async_std::fs;
use async_std::path::PathBuf; use async_std::path::PathBuf;
use async_std::prelude::*; use async_std::prelude::*;
@@ -22,6 +22,13 @@ pub struct Accounts {
config: Config, config: Config,
accounts: Arc<RwLock<BTreeMap<u32, Context>>>, accounts: Arc<RwLock<BTreeMap<u32, Context>>>,
emitter: EventEmitter, emitter: EventEmitter,
/// Sender side of the fake event channel.
///
/// We never send any events over this channel, but hold it during the account manager lifetime
/// to prevent `EventEmitter` from returning `None` as long as account manager is alive, even if
/// it holds no accounts which could emit events.
fake_sender: Sender<crate::events::Event>,
} }
impl Accounts { impl Accounts {
@@ -57,6 +64,11 @@ impl Accounts {
let accounts = config.load_accounts().await?; let accounts = config.load_accounts().await?;
let emitter = EventEmitter::new(); let emitter = EventEmitter::new();
// Fake event stream to prevent event emitter from closing.
let (fake_sender, fake_receiver) = channel::bounded(1);
emitter.sender.send(fake_receiver).await?;
for account in accounts.values() { for account in accounts.values() {
emitter.add_account(account).await?; emitter.add_account(account).await?;
} }
@@ -66,6 +78,7 @@ impl Accounts {
config, config,
accounts: Arc::new(RwLock::new(accounts)), accounts: Arc::new(RwLock::new(accounts)),
emitter, emitter,
fake_sender,
}) })
} }
@@ -263,18 +276,18 @@ impl Accounts {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct EventEmitter { pub struct EventEmitter {
/// Aggregate stream of events from all accounts. /// Aggregate stream of events from all accounts.
stream: Arc<RwLock<futures::stream::SelectAll<crate::events::EventEmitter>>>, stream: Arc<RwLock<futures::stream::SelectAll<Receiver<crate::events::Event>>>>,
/// Sender for the channel where new account emitters will be pushed. /// Sender for the channel where new account emitters will be pushed.
sender: Sender<crate::events::EventEmitter>, sender: Sender<Receiver<crate::events::Event>>,
/// Receiver for the channel where new account emitters will be pushed. /// Receiver for the channel where new account emitters will be pushed.
receiver: Receiver<crate::events::EventEmitter>, receiver: Receiver<Receiver<crate::events::Event>>,
} }
impl EventEmitter { impl EventEmitter {
pub fn new() -> Self { pub fn new() -> Self {
let (sender, receiver) = async_std::channel::unbounded(); let (sender, receiver) = channel::unbounded();
Self { Self {
stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())), stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())),
sender, sender,
@@ -302,7 +315,9 @@ impl EventEmitter {
/// Add event emitter of a new account to the aggregate event emitter. /// Add event emitter of a new account to the aggregate event emitter.
pub async fn add_account(&self, context: &Context) -> Result<()> { pub async fn add_account(&self, context: &Context) -> Result<()> {
self.sender.send(context.get_event_emitter()).await?; self.sender
.send(context.get_event_emitter().into_inner())
.await?;
Ok(()) Ok(())
} }
} }
@@ -702,4 +717,30 @@ mod tests {
Ok(()) Ok(())
} }
#[async_std::test]
async fn test_no_accounts_event_emitter() -> Result<()> {
let dir = tempfile::tempdir().unwrap();
let p: PathBuf = dir.path().join("accounts").into();
let accounts = Accounts::new("my_os".into(), p.clone()).await?;
// Make sure there are no accounts.
assert_eq!(accounts.accounts.read().await.len(), 0);
// Create event emitter.
let mut event_emitter = accounts.get_event_emitter().await;
// Test that event emitter does not return `None` immediately.
let duration = std::time::Duration::from_millis(1);
assert!(async_std::future::timeout(duration, event_emitter.recv())
.await
.is_err());
// When account manager is dropped, event emitter is exhausted.
drop(accounts);
assert_eq!(event_emitter.recv().await?, None);
Ok(())
}
} }

View File

@@ -62,6 +62,10 @@ impl Events {
pub struct EventEmitter(Receiver<Event>); pub struct EventEmitter(Receiver<Event>);
impl EventEmitter { impl EventEmitter {
pub(crate) fn into_inner(self) -> Receiver<Event> {
self.0
}
/// Blocking recv of an event. Return `None` if the `Sender` has been droped. /// Blocking recv of an event. Return `None` if the `Sender` has been droped.
pub fn recv_sync(&self) -> Option<Event> { pub fn recv_sync(&self) -> Option<Event> {
async_std::task::block_on(self.recv()) async_std::task::block_on(self.recv())