From 5856936f49c52ab4d0176d0aa64dfe4af7851353 Mon Sep 17 00:00:00 2001 From: link2xt Date: Tue, 27 Jul 2021 16:27:08 +0300 Subject: [PATCH] accounts: update EventEmitter on add_account (#2559) * accounts: update EventEmitter on add_account * accounts: do not lock waiting for EventEmitter in dc_accounts_add_account Otherwise dc_accounts_add_account blocks until an event arrives on some existing account. --- src/accounts.rs | 75 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/src/accounts.rs b/src/accounts.rs index 7eb0e3bff..d7efd4c0f 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use async_std::channel::{Receiver, Sender}; use async_std::fs; use async_std::path::PathBuf; use async_std::prelude::*; @@ -18,6 +19,7 @@ pub struct Accounts { dir: PathBuf, config: Config, accounts: Arc>>, + emitter: EventEmitter, } impl Accounts { @@ -52,10 +54,16 @@ impl Accounts { let config = Config::from_file(config_file).await?; let accounts = config.load_accounts().await?; + let emitter = EventEmitter::new(); + for account in accounts.values() { + emitter.add_account(account).await?; + } + Ok(Self { dir, config, accounts: Arc::new(RwLock::new(accounts)), + emitter, }) } @@ -83,6 +91,7 @@ impl Accounts { let account_config = self.config.new_account(&self.dir).await?; let ctx = Context::new(os_name, account_config.dbfile().into(), account_config.id).await?; + self.emitter.add_account(&ctx).await?; self.accounts.write().await.insert(account_config.id, ctx); Ok(account_config.id) @@ -228,30 +237,60 @@ impl Accounts { /// Unified event emitter. pub async fn get_event_emitter(&self) -> EventEmitter { - let emitters: Vec<_> = self - .accounts - .read() - .await - .iter() - .map(|(_id, a)| a.get_event_emitter()) - .collect(); - - EventEmitter(futures::stream::select_all(emitters)) + self.emitter.clone() } } -#[derive(Debug)] -pub struct EventEmitter(futures::stream::SelectAll); +#[derive(Debug, Clone)] +pub struct EventEmitter { + /// Aggregate stream of events from all accounts. + stream: Arc>>, + + /// Sender for the channel where new account emitters will be pushed. + sender: Sender, + + /// Receiver for the channel where new account emitters will be pushed. + receiver: Receiver, +} impl EventEmitter { - /// Blocking recv of an event. Return `None` if all `Sender`s have been droped. - pub fn recv_sync(&mut self) -> Option { - async_std::task::block_on(self.recv()) + pub fn new() -> Self { + let (sender, receiver) = async_std::channel::unbounded(); + Self { + stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())), + sender, + receiver, + } } - /// Async recv of an event. Return `None` if all `Sender`s have been droped. - pub async fn recv(&mut self) -> Option { - self.0.next().await + /// Blocking recv of an event. Return `None` if all `Sender`s have been droped. + pub fn recv_sync(&mut self) -> Option { + async_std::task::block_on(self.recv()).unwrap_or_default() + } + + /// Async recv of an event. Return `None` if all `Sender`s have been dropped. + pub async fn recv(&mut self) -> Result> { + let mut stream = self.stream.write().await; + loop { + match futures::future::select(self.receiver.recv(), stream.next()).await { + futures::future::Either::Left((emitter, _)) => { + stream.push(emitter?); + } + futures::future::Either::Right((ev, _)) => return Ok(ev), + } + } + } + + /// Add event emitter of a new account to the aggregate event emitter. + pub async fn add_account(&self, context: &Context) -> Result<()> { + self.sender.send(context.get_event_emitter()).await?; + Ok(()) + } +} + +impl Default for EventEmitter { + fn default() -> Self { + Self::new() } } @@ -262,7 +301,7 @@ impl async_std::stream::Stream for EventEmitter { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.0).poll_next(cx) + std::pin::Pin::new(&mut self).poll_next(cx) } }