mirror of
https://github.com/chatmail/core.git
synced 2026-05-02 12:56:30 +03:00
Merge tag 'v1.124.0'
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
@@ -28,10 +29,16 @@ class Rpc:
|
||||
self.events_task: asyncio.Task
|
||||
|
||||
async def start(self) -> None:
|
||||
# Use buffer of 64 MiB.
|
||||
# Default limit as of Python 3.11 is 2**16 bytes, this is too low for some JSON-RPC responses,
|
||||
# such as loading large HTML message content.
|
||||
limit = 2**26
|
||||
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
"deltachat-rpc-server",
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
limit=limit,
|
||||
**self._kwargs,
|
||||
)
|
||||
self.id = 0
|
||||
@@ -57,16 +64,20 @@ class Rpc:
|
||||
await self.close()
|
||||
|
||||
async def reader_loop(self) -> None:
|
||||
while True:
|
||||
line = await self.process.stdout.readline() # noqa
|
||||
if not line: # EOF
|
||||
break
|
||||
response = json.loads(line)
|
||||
if "id" in response:
|
||||
fut = self.request_events.pop(response["id"])
|
||||
fut.set_result(response)
|
||||
else:
|
||||
print(response)
|
||||
try:
|
||||
while True:
|
||||
line = await self.process.stdout.readline() # noqa
|
||||
if not line: # EOF
|
||||
break
|
||||
response = json.loads(line)
|
||||
if "id" in response:
|
||||
fut = self.request_events.pop(response["id"])
|
||||
fut.set_result(response)
|
||||
else:
|
||||
print(response)
|
||||
except Exception:
|
||||
# Log an exception if the reader loop dies.
|
||||
logging.exception("Exception in the reader loop")
|
||||
|
||||
async def get_queue(self, account_id: int) -> asyncio.Queue:
|
||||
if account_id not in self.event_queues:
|
||||
@@ -75,13 +86,17 @@ class Rpc:
|
||||
|
||||
async def events_loop(self) -> None:
|
||||
"""Requests new events and distributes them between queues."""
|
||||
while True:
|
||||
if self.closing:
|
||||
return
|
||||
event = await self.get_next_event()
|
||||
account_id = event["contextId"]
|
||||
queue = await self.get_queue(account_id)
|
||||
await queue.put(event["event"])
|
||||
try:
|
||||
while True:
|
||||
if self.closing:
|
||||
return
|
||||
event = await self.get_next_event()
|
||||
account_id = event["contextId"]
|
||||
queue = await self.get_queue(account_id)
|
||||
await queue.put(event["event"])
|
||||
except Exception:
|
||||
# Log an exception if the event loop dies.
|
||||
logging.exception("Exception in the event loop")
|
||||
|
||||
async def wait_for_event(self, account_id: int) -> Optional[dict]:
|
||||
"""Waits for the next event from the given account and returns it."""
|
||||
|
||||
Reference in New Issue
Block a user