diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index 38fcc4fdc..cdc2c99a0 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -3576,7 +3576,7 @@ pub unsafe extern "C" fn dc_accounts_get_next_event( if emitter.is_null() { return ptr::null_mut(); } - let emitter = &*emitter; + let emitter = &mut *emitter; emitter .recv_sync() diff --git a/src/accounts.rs b/src/accounts.rs index 7c23a6346..1c4ba734e 100644 --- a/src/accounts.rs +++ b/src/accounts.rs @@ -1,10 +1,8 @@ use std::collections::BTreeMap; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::task::{Context as TaskContext, Poll}; use async_std::fs; use async_std::path::PathBuf; +use async_std::prelude::*; use async_std::sync::{Arc, RwLock}; use uuid::Uuid; @@ -225,61 +223,42 @@ impl Accounts { /// Unified event emitter. pub async fn get_event_emitter(&self) -> EventEmitter { - let emitters = self + let emitters: Vec<_> = self .accounts .read() .await .iter() - .map(|(id, a)| EmitterWrapper { - id: *id, - emitter: a.get_event_emitter(), - done: AtomicBool::new(false), - }) + .map(|(_id, a)| a.get_event_emitter()) .collect(); - EventEmitter(emitters) + EventEmitter(futures::stream::select_all(emitters)) } } +#[derive(Debug)] +pub struct EventEmitter(futures::stream::SelectAll); + impl EventEmitter { - /// Blocking recv of an event. Return `None` if the `Sender` has been droped. - pub fn recv_sync(&self) -> Option { + /// Blocking recv of an event. Return `None` if all `Sender`s have been droped. + pub fn recv_sync(&mut self) -> Option { async_std::task::block_on(self.recv()) } - /// Async recv of an event. Return `None` if the `Sender` has been droped. - pub async fn recv(&self) -> Option { - futures::future::poll_fn(|cx| Pin::new(self).recv_poll(cx)).await - } - - fn recv_poll(self: Pin<&Self>, _cx: &mut TaskContext<'_>) -> Poll> { - for e in &*self.0 { - if e.done.load(Ordering::Acquire) { - // skip emitters that are already done - continue; - } - - match e.emitter.try_recv() { - Ok(event) => return Poll::Ready(Some(event)), - Err(async_std::sync::TryRecvError::Disconnected) => { - e.done.store(true, Ordering::Release); - } - Err(async_std::sync::TryRecvError::Empty) => {} - } - } - - Poll::Pending + /// Async recv of an event. Return `None` if all `Sender`s have been droped. + pub async fn recv(&mut self) -> Option { + self.0.next().await } } -#[derive(Debug)] -pub struct EventEmitter(Vec); +impl async_std::stream::Stream for EventEmitter { + type Item = Event; -#[derive(Debug)] -struct EmitterWrapper { - id: u32, - emitter: crate::events::EventEmitter, - done: AtomicBool, + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.0).poll_next(cx) + } } pub const CONFIG_NAME: &str = "accounts.toml"; diff --git a/src/events.rs b/src/events.rs index f884a7589..643fab3a2 100644 --- a/src/events.rs +++ b/src/events.rs @@ -58,12 +58,18 @@ impl EventEmitter { /// Async recv of an event. Return `None` if the `Sender` has been droped. pub async fn recv(&self) -> Option { - // TODO: change once we can use async channels internally. self.0.recv().await.ok() } +} - pub fn try_recv(&self) -> Result { - self.0.try_recv() +impl async_std::stream::Stream for EventEmitter { + type Item = Event; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.0).poll_next(cx) } }