diff --git a/src/events.rs b/src/events.rs index 40c24ebab..296785a23 100644 --- a/src/events.rs +++ b/src/events.rs @@ -71,7 +71,17 @@ impl EventEmitter { /// [`try_recv`]: Self::try_recv pub async fn recv(&self) -> Option { let mut lock = self.0.lock().await; - lock.recv().await.ok() + loop { + match lock.recv().await { + Err(async_broadcast::RecvError::Overflowed(_)) => { + // Some events have been lost, + // but the channel is not closed. + continue; + } + Err(async_broadcast::RecvError::Closed) => return None, + Ok(event) => return Some(event), + } + } } /// Tries to receive an event without blocking. @@ -86,8 +96,18 @@ impl EventEmitter { // to avoid blocking // in case there is a concurrent call to `recv`. let mut lock = self.0.try_lock()?; - let event = lock.try_recv()?; - Ok(event) + loop { + match lock.try_recv() { + Err(async_broadcast::TryRecvError::Overflowed(_)) => { + // Some events have been lost, + // but the channel is not closed. + continue; + } + res @ (Err(async_broadcast::TryRecvError::Empty) + | Err(async_broadcast::TryRecvError::Closed) + | Ok(_)) => return Ok(res?), + } + } } }