mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 21:46:35 +03:00
refactor: strike events in rpc-client request handling, get result from queue
This commit is contained in:
@@ -9,7 +9,7 @@ import os
|
||||
import subprocess
|
||||
import sys
|
||||
from queue import Empty, Queue
|
||||
from threading import Event, Thread
|
||||
from threading import Thread
|
||||
from typing import Any, Iterator, Optional
|
||||
|
||||
|
||||
@@ -17,25 +17,6 @@ class JsonRpcError(Exception):
|
||||
"""JSON-RPC error."""
|
||||
|
||||
|
||||
class RpcFuture:
|
||||
"""RPC future waiting for RPC call result."""
|
||||
|
||||
def __init__(self, rpc: "Rpc", request_id: int, event: Event):
|
||||
self.rpc = rpc
|
||||
self.request_id = request_id
|
||||
self.event = event
|
||||
|
||||
def __call__(self):
|
||||
"""Wait for the future to return the result."""
|
||||
self.event.wait()
|
||||
response = self.rpc.request_results.pop(self.request_id)
|
||||
if "error" in response:
|
||||
raise JsonRpcError(response["error"])
|
||||
if "result" in response:
|
||||
return response["result"]
|
||||
return None
|
||||
|
||||
|
||||
class RpcMethod:
|
||||
"""RPC method."""
|
||||
|
||||
@@ -57,11 +38,17 @@ class RpcMethod:
|
||||
"params": args,
|
||||
"id": request_id,
|
||||
}
|
||||
event = Event()
|
||||
self.rpc.request_events[request_id] = event
|
||||
self.rpc.request_results[request_id] = queue = Queue()
|
||||
self.rpc.request_queue.put(request)
|
||||
|
||||
return RpcFuture(self.rpc, request_id, event)
|
||||
def rpc_future():
|
||||
"""Wait for the request to receive a result."""
|
||||
response = queue.get()
|
||||
if "error" in response:
|
||||
raise JsonRpcError(response["error"])
|
||||
return response.get("result", None)
|
||||
|
||||
return rpc_future
|
||||
|
||||
|
||||
class Rpc:
|
||||
@@ -82,10 +69,8 @@ class Rpc:
|
||||
self.process: subprocess.Popen
|
||||
self.id_iterator: Iterator[int]
|
||||
self.event_queues: dict[int, Queue]
|
||||
# Map from request ID to `threading.Event`.
|
||||
self.request_events: dict[int, Event]
|
||||
# Map from request ID to the result.
|
||||
self.request_results: dict[int, Any]
|
||||
# Map from request ID to a Queue which provides a single result
|
||||
self.request_results: dict[int, Queue]
|
||||
self.request_queue: Queue[Any]
|
||||
self.closing: bool
|
||||
self.reader_thread: Thread
|
||||
@@ -114,7 +99,6 @@ class Rpc:
|
||||
)
|
||||
self.id_iterator = itertools.count(start=1)
|
||||
self.event_queues = {}
|
||||
self.request_events = {}
|
||||
self.request_results = {}
|
||||
self.request_queue = Queue()
|
||||
self.closing = False
|
||||
@@ -149,9 +133,7 @@ class Rpc:
|
||||
response = json.loads(line)
|
||||
if "id" in response:
|
||||
response_id = response["id"]
|
||||
event = self.request_events.pop(response_id)
|
||||
self.request_results[response_id] = response
|
||||
event.set()
|
||||
self.request_results.pop(response_id).put(response)
|
||||
else:
|
||||
logging.warning("Got a response without ID: %s", response)
|
||||
except Exception:
|
||||
|
||||
Reference in New Issue
Block a user