accounts: update EventEmitter on add_account (#2559)

* accounts: update EventEmitter on add_account

* accounts: do not lock waiting for EventEmitter in dc_accounts_add_account

Otherwise dc_accounts_add_account blocks until an event arrives on
some existing account.
This commit is contained in:
link2xt
2021-07-27 16:27:08 +03:00
committed by GitHub
parent 532060d8b7
commit 5856936f49

View File

@@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use async_std::channel::{Receiver, Sender};
use async_std::fs;
use async_std::path::PathBuf;
use async_std::prelude::*;
@@ -18,6 +19,7 @@ pub struct Accounts {
dir: PathBuf,
config: Config,
accounts: Arc<RwLock<BTreeMap<u32, Context>>>,
emitter: EventEmitter,
}
impl Accounts {
@@ -52,10 +54,16 @@ impl Accounts {
let config = Config::from_file(config_file).await?;
let accounts = config.load_accounts().await?;
let emitter = EventEmitter::new();
for account in accounts.values() {
emitter.add_account(account).await?;
}
Ok(Self {
dir,
config,
accounts: Arc::new(RwLock::new(accounts)),
emitter,
})
}
@@ -83,6 +91,7 @@ impl Accounts {
let account_config = self.config.new_account(&self.dir).await?;
let ctx = Context::new(os_name, account_config.dbfile().into(), account_config.id).await?;
self.emitter.add_account(&ctx).await?;
self.accounts.write().await.insert(account_config.id, ctx);
Ok(account_config.id)
@@ -228,30 +237,60 @@ impl Accounts {
/// Unified event emitter.
pub async fn get_event_emitter(&self) -> EventEmitter {
let emitters: Vec<_> = self
.accounts
.read()
.await
.iter()
.map(|(_id, a)| a.get_event_emitter())
.collect();
EventEmitter(futures::stream::select_all(emitters))
self.emitter.clone()
}
}
#[derive(Debug)]
pub struct EventEmitter(futures::stream::SelectAll<crate::events::EventEmitter>);
#[derive(Debug, Clone)]
pub struct EventEmitter {
/// Aggregate stream of events from all accounts.
stream: Arc<RwLock<futures::stream::SelectAll<crate::events::EventEmitter>>>,
/// Sender for the channel where new account emitters will be pushed.
sender: Sender<crate::events::EventEmitter>,
/// Receiver for the channel where new account emitters will be pushed.
receiver: Receiver<crate::events::EventEmitter>,
}
impl EventEmitter {
/// Blocking recv of an event. Return `None` if all `Sender`s have been droped.
pub fn recv_sync(&mut self) -> Option<Event> {
async_std::task::block_on(self.recv())
pub fn new() -> Self {
let (sender, receiver) = async_std::channel::unbounded();
Self {
stream: Arc::new(RwLock::new(futures::stream::SelectAll::new())),
sender,
receiver,
}
}
/// Async recv of an event. Return `None` if all `Sender`s have been droped.
pub async fn recv(&mut self) -> Option<Event> {
self.0.next().await
/// Blocking recv of an event. Return `None` if all `Sender`s have been droped.
pub fn recv_sync(&mut self) -> Option<Event> {
async_std::task::block_on(self.recv()).unwrap_or_default()
}
/// Async recv of an event. Return `None` if all `Sender`s have been dropped.
pub async fn recv(&mut self) -> Result<Option<Event>> {
let mut stream = self.stream.write().await;
loop {
match futures::future::select(self.receiver.recv(), stream.next()).await {
futures::future::Either::Left((emitter, _)) => {
stream.push(emitter?);
}
futures::future::Either::Right((ev, _)) => return Ok(ev),
}
}
}
/// Add event emitter of a new account to the aggregate event emitter.
pub async fn add_account(&self, context: &Context) -> Result<()> {
self.sender.send(context.get_event_emitter()).await?;
Ok(())
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
@@ -262,7 +301,7 @@ impl async_std::stream::Stream for EventEmitter {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_next(cx)
std::pin::Pin::new(&mut self).poll_next(cx)
}
}