Emit events from account manager

Errors and warnings are emitted with a special 0 account ID.
This commit is contained in:
link2xt
2021-09-25 13:02:30 +00:00
parent 89d8b26192
commit 2309c7ca13
4 changed files with 68 additions and 34 deletions

View File

@@ -13,7 +13,7 @@ use anyhow::{ensure, Context as _, Result};
use serde::{Deserialize, Serialize};
use crate::context::Context;
use crate::events::Event;
use crate::events::{Event, EventType, Events};
/// Account manager, that can handle multiple accounts in a single place.
#[derive(Debug)]
@@ -23,12 +23,8 @@ pub struct Accounts {
accounts: BTreeMap<u32, Context>,
emitter: EventEmitter,
/// Sender side of the fake event channel.
///
/// We never send any events over this channel, but hold it during the account manager lifetime
/// to prevent `EventEmitter` from returning `None` as long as account manager is alive, even if
/// it holds no accounts which could emit events.
fake_sender: Sender<crate::events::Event>,
/// Event channel to emit account manager errors.
events: Events,
}
impl Accounts {
@@ -65,9 +61,9 @@ impl Accounts {
let emitter = EventEmitter::new();
// Fake event stream to prevent event emitter from closing.
let (fake_sender, fake_receiver) = channel::bounded(1);
emitter.sender.send(fake_receiver).await?;
let events = Events::default();
emitter.sender.send(events.get_emitter()).await?;
for account in accounts.values() {
emitter.add_account(account).await?;
@@ -78,7 +74,7 @@ impl Accounts {
config,
accounts,
emitter,
fake_sender,
events,
})
}
@@ -262,6 +258,11 @@ impl Accounts {
}
}
/// Emits a single event.
pub fn emit_event(&self, event: EventType) {
self.events.emit(Event { id: 0, typ: event })
}
/// Returns unified event emitter.
pub async fn get_event_emitter(&self) -> EventEmitter {
self.emitter.clone()
@@ -272,13 +273,13 @@ impl Accounts {
#[derive(Debug, Clone)]
pub struct EventEmitter {
/// Aggregate stream of events from all accounts.
stream: Arc<RwLock<futures::stream::SelectAll<Receiver<crate::events::Event>>>>,
stream: Arc<RwLock<futures::stream::SelectAll<crate::events::EventEmitter>>>,
/// Sender for the channel where new account emitters will be pushed.
sender: Sender<Receiver<crate::events::Event>>,
sender: Sender<crate::events::EventEmitter>,
/// Receiver for the channel where new account emitters will be pushed.
receiver: Receiver<Receiver<crate::events::Event>>,
receiver: Receiver<crate::events::EventEmitter>,
}
impl EventEmitter {
@@ -311,9 +312,7 @@ impl EventEmitter {
/// Add event emitter of a new account to the aggregate event emitter.
pub async fn add_account(&self, context: &Context) -> Result<()> {
self.sender
.send(context.get_event_emitter().into_inner())
.await?;
self.sender.send(context.get_event_emitter()).await?;
Ok(())
}
}

View File

@@ -62,10 +62,6 @@ impl Events {
pub struct EventEmitter(Receiver<Event>);
impl EventEmitter {
pub(crate) fn into_inner(self) -> Receiver<Event> {
self.0
}
/// Blocking recv of an event. Return `None` if the `Sender` has been droped.
pub fn recv_sync(&self) -> Option<Event> {
async_std::task::block_on(self.recv())