mirror of
https://github.com/chatmail/core.git
synced 2026-04-18 22:16:30 +03:00
fix: ignore event channel overflows
async-broadcast returns Overflowed error once if channel overflow happened. Public APIs such as get_next_event JSON-RPC method are only expecting an error if the channel is closed, so we should not propagate overflow error outside. In particular, Delta Chat Desktop stop receiving events completely if an error is returned once. If overflow happens, we should ignore it and try again until we get an event or an error because the channel is closed (in case of recv()) or empty (in case of try_recv()).
This commit is contained in:
@@ -71,7 +71,17 @@ impl EventEmitter {
|
||||
/// [`try_recv`]: Self::try_recv
|
||||
pub async fn recv(&self) -> Option<Event> {
|
||||
let mut lock = self.0.lock().await;
|
||||
lock.recv().await.ok()
|
||||
loop {
|
||||
match lock.recv().await {
|
||||
Err(async_broadcast::RecvError::Overflowed(_)) => {
|
||||
// Some events have been lost,
|
||||
// but the channel is not closed.
|
||||
continue;
|
||||
}
|
||||
Err(async_broadcast::RecvError::Closed) => return None,
|
||||
Ok(event) => return Some(event),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to receive an event without blocking.
|
||||
@@ -86,8 +96,18 @@ impl EventEmitter {
|
||||
// to avoid blocking
|
||||
// in case there is a concurrent call to `recv`.
|
||||
let mut lock = self.0.try_lock()?;
|
||||
let event = lock.try_recv()?;
|
||||
Ok(event)
|
||||
loop {
|
||||
match lock.try_recv() {
|
||||
Err(async_broadcast::TryRecvError::Overflowed(_)) => {
|
||||
// Some events have been lost,
|
||||
// but the channel is not closed.
|
||||
continue;
|
||||
}
|
||||
res @ (Err(async_broadcast::TryRecvError::Empty)
|
||||
| Err(async_broadcast::TryRecvError::Closed)
|
||||
| Ok(_)) => return Ok(res?),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user