mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
perf: batched event reception
This commit is contained in:
@@ -194,6 +194,16 @@ impl CommandApi {
|
||||
.context("event channel is closed")
|
||||
}
|
||||
|
||||
/// Waits for at least one event and return a batch of events.
|
||||
async fn get_next_event_batch(&self) -> Vec<Event> {
|
||||
self.event_emitter
|
||||
.recv_batch()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|event| event.into())
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ---------------------------------------------
|
||||
// Account Management
|
||||
// ---------------------------------------------
|
||||
|
||||
@@ -53,18 +53,19 @@ export class BaseDeltaChat<
|
||||
*/
|
||||
async eventLoop(): Promise<void> {
|
||||
while (true) {
|
||||
const event = await this.rpc.getNextEvent();
|
||||
//@ts-ignore
|
||||
this.emit(event.event.kind, event.contextId, event.event);
|
||||
this.emit("ALL", event.contextId, event.event);
|
||||
for (const event of await this.rpc.getNextEventBatch()) {
|
||||
//@ts-ignore
|
||||
this.emit(event.event.kind, event.contextId, event.event);
|
||||
this.emit("ALL", event.contextId, event.event);
|
||||
|
||||
if (this.contextEmitters[event.contextId]) {
|
||||
this.contextEmitters[event.contextId].emit(
|
||||
event.event.kind,
|
||||
//@ts-ignore
|
||||
event.event as any,
|
||||
);
|
||||
this.contextEmitters[event.contextId].emit("ALL", event.event as any);
|
||||
if (this.contextEmitters[event.contextId]) {
|
||||
this.contextEmitters[event.contextId].emit(
|
||||
event.event.kind,
|
||||
//@ts-ignore
|
||||
event.event as any,
|
||||
);
|
||||
this.contextEmitters[event.contextId].emit("ALL", event.event as any);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,15 +154,15 @@ class Rpc:
|
||||
def events_loop(self) -> None:
|
||||
"""Request new events and distributes them between queues."""
|
||||
try:
|
||||
while True:
|
||||
while events := self.get_next_event_batch():
|
||||
for event in events:
|
||||
account_id = event["contextId"]
|
||||
queue = self.get_queue(account_id)
|
||||
payload = event["event"]
|
||||
logging.debug("account_id=%d got an event %s", account_id, payload)
|
||||
queue.put(payload)
|
||||
if self.closing:
|
||||
return
|
||||
event = self.get_next_event()
|
||||
account_id = event["contextId"]
|
||||
queue = self.get_queue(account_id)
|
||||
event = event["event"]
|
||||
logging.debug("account_id=%d got an event %s", account_id, event)
|
||||
queue.put(event)
|
||||
except Exception:
|
||||
# Log an exception if the event loop dies.
|
||||
logging.exception("Exception in the event loop")
|
||||
|
||||
@@ -107,6 +107,39 @@ impl EventEmitter {
|
||||
| Ok(_)) => Ok(res?),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits until there is at least one event available
|
||||
/// and then returns a vector of at least one event.
|
||||
///
|
||||
/// Returns empty vector if the sender has been dropped.
|
||||
pub async fn recv_batch(&self) -> Vec<Event> {
|
||||
let mut lock = self.0.lock().await;
|
||||
let mut res = match lock.recv_direct().await {
|
||||
Err(async_broadcast::RecvError::Overflowed(n)) => vec![Event {
|
||||
id: 0,
|
||||
typ: EventType::EventChannelOverflow { n },
|
||||
}],
|
||||
Err(async_broadcast::RecvError::Closed) => return Vec::new(),
|
||||
Ok(event) => vec![event],
|
||||
};
|
||||
|
||||
// Return up to 100 events in a single batch
|
||||
// to have a limit on used memory if events arrive too fast.
|
||||
for _ in 0..100 {
|
||||
match lock.try_recv() {
|
||||
Err(async_broadcast::TryRecvError::Overflowed(n)) => res.push(Event {
|
||||
id: 0,
|
||||
typ: EventType::EventChannelOverflow { n },
|
||||
}),
|
||||
Ok(event) => res.push(event),
|
||||
Err(async_broadcast::TryRecvError::Empty)
|
||||
| Err(async_broadcast::TryRecvError::Closed) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
/// The event emitted by a [`Context`] from an [`EventEmitter`].
|
||||
|
||||
Reference in New Issue
Block a user