From 00a6f41441e32028ce780135f0fc54bea7f5df34 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 28 Nov 2025 21:40:08 +0100 Subject: [PATCH] make fifo's work --- .../src/deltachat_rpc_client/__init__.py | 3 +- .../src/deltachat_rpc_client/rpc.py | 44 ++++++++++++++++--- deltachat-rpc-client/tests/test_rpc_fifo.py | 22 ++++++++++ 3 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 deltachat-rpc-client/tests/test_rpc_fifo.py diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py index 1a532c701..7a2254816 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py @@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId from .contact import Contact from .deltachat import DeltaChat from .message import Message -from .rpc import Rpc +from .rpc import Rpc, RpcFIFO __all__ = [ "Account", @@ -22,6 +22,7 @@ __all__ = [ "Message", "SpecialContactId", "Rpc", + "RpcFIFO", "run_bot_cli", "run_client_cli", ] diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index fe748fdab..880c7c0af 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -47,6 +47,8 @@ class RpcMethod: def rpc_future(): """Wait for the request to receive a result.""" response = queue.get() + if response is RpcShutdownError: + raise RpcShutdownError(f"no response for {request_id}/{self.name} but rpc is shutting down") if "error" in response: raise JsonRpcError(response["error"]) return response.get("result", None) @@ -54,13 +56,22 @@ class RpcMethod: return rpc_future +class RpcShutdownError(Exception): + """Raised in RPC methods if the connection to server is closing.""" + + class BaseRpc: + """Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods + from subclasses to work concretely.""" + def __init__(self): self.id_iterator: Iterator[int] self.event_queues: dict[int, Queue] # Map from request ID to a Queue which provides a single result self.request_results: dict[int, Queue] self.request_queue: Queue[Any] + self.server_stdin: io.Writer[bytes] + self.server_stdout: io.Reader[bytes] self.closing: bool self.reader_thread: Thread self.writer_thread: Thread @@ -85,8 +96,8 @@ class BaseRpc: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True self.disconnect_from_server() - self.events_thread.join() self.reader_thread.join() + self.events_thread.join() self.request_queue.put(None) self.writer_thread.join() @@ -111,6 +122,10 @@ class BaseRpc: # Log an exception if the reader loop dies. logging.exception("Exception in the reader loop") + # terminate pending rpc requests because no responses can arrive anymore + for queue in self.request_results.values(): + queue.put(RpcShutdownError) + def writer_loop(self) -> None: """Writer loop ensuring only a single thread writes requests.""" try: @@ -135,7 +150,10 @@ class BaseRpc: while True: if self.closing: return - event = self.get_next_event() + try: + event = self.get_next_event() + except RpcShutdownError: + return account_id = event["contextId"] queue = self.get_queue(account_id) event = event["event"] @@ -172,10 +190,8 @@ class Rpc(BaseRpc): The given arguments will be passed to subprocess.Popen(). """ super(Rpc, self).__init__() - self.server_stdout: io.Writer[bytes] - self.server_stdin: io.Reader[bytes] self._accounts_dir = accounts_dir - self.rpc_server_path = rpc_server_path + self.rpc_server_path: str = rpc_server_path def connect_to_server(self): popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} @@ -196,3 +212,21 @@ class Rpc(BaseRpc): def disconnect_from_server(self): self.stop_io_for_all_accounts() self.server_stdin.close() + + +class RpcFIFO(BaseRpc): + """RPC client that runs and connects to a deltachat-rpc-server through FIFO files.""" + + def __init__(self, fn_request_fifo: str, fn_response_fifo: str): + super(RpcFIFO, self).__init__() + self.fn_request_fifo = fn_request_fifo + self.fn_response_fifo = fn_response_fifo + + def connect_to_server(self): + server_stdin = open(self.fn_request_fifo, "wb") # noqa + server_stdout = open(self.fn_response_fifo, "rb") # noqa + return server_stdout, server_stdin + + def disconnect_from_server(self): + self.server_stdin.close() + self.server_stdout.close() diff --git a/deltachat-rpc-client/tests/test_rpc_fifo.py b/deltachat-rpc-client/tests/test_rpc_fifo.py new file mode 100644 index 000000000..7bf66dd27 --- /dev/null +++ b/deltachat-rpc-client/tests/test_rpc_fifo.py @@ -0,0 +1,22 @@ +import os +import platform # noqa +import subprocess + +import pytest + +from deltachat_rpc_client import DeltaChat, RpcFIFO + + +@pytest.mark.skipif("platform.system() == 'Windows'") +def test_rpc_fifo(tmp_path): + fn_request_fifo = tmp_path.joinpath("request_fifo") + fn_response_fifo = tmp_path.joinpath("response_fifo") + os.mkfifo(fn_request_fifo) + os.mkfifo(fn_response_fifo) + popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True) + + rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo) + with rpc: + dc = DeltaChat(rpc) + assert dc.rpc.get_system_info()["deltachat_core_version"] is not None + popen.wait()