fix: avoid manual poll impl for accounts events

This commit is contained in:
Friedel Ziegelmayer
2020-09-29 14:00:10 +02:00
committed by GitHub
parent 60a8b47ad0
commit 7786a4ced4
3 changed files with 30 additions and 45 deletions

View File

@@ -1,10 +1,8 @@
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context as TaskContext, Poll};
use async_std::fs;
use async_std::path::PathBuf;
use async_std::prelude::*;
use async_std::sync::{Arc, RwLock};
use uuid::Uuid;
@@ -225,61 +223,42 @@ impl Accounts {
/// Unified event emitter.
pub async fn get_event_emitter(&self) -> EventEmitter {
let emitters = self
let emitters: Vec<_> = self
.accounts
.read()
.await
.iter()
.map(|(id, a)| EmitterWrapper {
id: *id,
emitter: a.get_event_emitter(),
done: AtomicBool::new(false),
})
.map(|(_id, a)| a.get_event_emitter())
.collect();
EventEmitter(emitters)
EventEmitter(futures::stream::select_all(emitters))
}
}
#[derive(Debug)]
pub struct EventEmitter(futures::stream::SelectAll<crate::events::EventEmitter>);
impl EventEmitter {
/// Blocking recv of an event. Return `None` if the `Sender` has been droped.
pub fn recv_sync(&self) -> Option<Event> {
/// Blocking recv of an event. Return `None` if all `Sender`s have been droped.
pub fn recv_sync(&mut self) -> Option<Event> {
async_std::task::block_on(self.recv())
}
/// Async recv of an event. Return `None` if the `Sender` has been droped.
pub async fn recv(&self) -> Option<Event> {
futures::future::poll_fn(|cx| Pin::new(self).recv_poll(cx)).await
}
fn recv_poll(self: Pin<&Self>, _cx: &mut TaskContext<'_>) -> Poll<Option<Event>> {
for e in &*self.0 {
if e.done.load(Ordering::Acquire) {
// skip emitters that are already done
continue;
}
match e.emitter.try_recv() {
Ok(event) => return Poll::Ready(Some(event)),
Err(async_std::sync::TryRecvError::Disconnected) => {
e.done.store(true, Ordering::Release);
}
Err(async_std::sync::TryRecvError::Empty) => {}
}
}
Poll::Pending
/// Async recv of an event. Return `None` if all `Sender`s have been droped.
pub async fn recv(&mut self) -> Option<Event> {
self.0.next().await
}
}
#[derive(Debug)]
pub struct EventEmitter(Vec<EmitterWrapper>);
impl async_std::stream::Stream for EventEmitter {
type Item = Event;
#[derive(Debug)]
struct EmitterWrapper {
id: u32,
emitter: crate::events::EventEmitter,
done: AtomicBool,
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_next(cx)
}
}
pub const CONFIG_NAME: &str = "accounts.toml";

View File

@@ -58,12 +58,18 @@ impl EventEmitter {
/// Async recv of an event. Return `None` if the `Sender` has been droped.
pub async fn recv(&self) -> Option<Event> {
// TODO: change once we can use async channels internally.
self.0.recv().await.ok()
}
}
pub fn try_recv(&self) -> Result<Event, async_std::sync::TryRecvError> {
self.0.try_recv()
impl async_std::stream::Stream for EventEmitter {
type Item = Event;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.0).poll_next(cx)
}
}