make fifo's work

This commit is contained in:
holger krekel
2025-11-28 21:40:08 +01:00
parent a77e9cb8d5
commit 00a6f41441
3 changed files with 63 additions and 6 deletions

View File

@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
from .contact import Contact from .contact import Contact
from .deltachat import DeltaChat from .deltachat import DeltaChat
from .message import Message from .message import Message
from .rpc import Rpc from .rpc import Rpc, RpcFIFO
__all__ = [ __all__ = [
"Account", "Account",
@@ -22,6 +22,7 @@ __all__ = [
"Message", "Message",
"SpecialContactId", "SpecialContactId",
"Rpc", "Rpc",
"RpcFIFO",
"run_bot_cli", "run_bot_cli",
"run_client_cli", "run_client_cli",
] ]

View File

@@ -47,6 +47,8 @@ class RpcMethod:
def rpc_future(): def rpc_future():
"""Wait for the request to receive a result.""" """Wait for the request to receive a result."""
response = queue.get() response = queue.get()
if response is RpcShutdownError:
raise RpcShutdownError(f"no response for {request_id}/{self.name} but rpc is shutting down")
if "error" in response: if "error" in response:
raise JsonRpcError(response["error"]) raise JsonRpcError(response["error"])
return response.get("result", None) return response.get("result", None)
@@ -54,13 +56,22 @@ class RpcMethod:
return rpc_future return rpc_future
class RpcShutdownError(Exception):
"""Raised in RPC methods if the connection to server is closing."""
class BaseRpc: class BaseRpc:
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
from subclasses to work concretely."""
def __init__(self): def __init__(self):
self.id_iterator: Iterator[int] self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue] self.event_queues: dict[int, Queue]
# Map from request ID to a Queue which provides a single result # Map from request ID to a Queue which provides a single result
self.request_results: dict[int, Queue] self.request_results: dict[int, Queue]
self.request_queue: Queue[Any] self.request_queue: Queue[Any]
self.server_stdin: io.Writer[bytes]
self.server_stdout: io.Reader[bytes]
self.closing: bool self.closing: bool
self.reader_thread: Thread self.reader_thread: Thread
self.writer_thread: Thread self.writer_thread: Thread
@@ -85,8 +96,8 @@ class BaseRpc:
"""Terminate RPC server process and wait until the reader loop finishes.""" """Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True self.closing = True
self.disconnect_from_server() self.disconnect_from_server()
self.events_thread.join()
self.reader_thread.join() self.reader_thread.join()
self.events_thread.join()
self.request_queue.put(None) self.request_queue.put(None)
self.writer_thread.join() self.writer_thread.join()
@@ -111,6 +122,10 @@ class BaseRpc:
# Log an exception if the reader loop dies. # Log an exception if the reader loop dies.
logging.exception("Exception in the reader loop") logging.exception("Exception in the reader loop")
# terminate pending rpc requests because no responses can arrive anymore
for queue in self.request_results.values():
queue.put(RpcShutdownError)
def writer_loop(self) -> None: def writer_loop(self) -> None:
"""Writer loop ensuring only a single thread writes requests.""" """Writer loop ensuring only a single thread writes requests."""
try: try:
@@ -135,7 +150,10 @@ class BaseRpc:
while True: while True:
if self.closing: if self.closing:
return return
event = self.get_next_event() try:
event = self.get_next_event()
except RpcShutdownError:
return
account_id = event["contextId"] account_id = event["contextId"]
queue = self.get_queue(account_id) queue = self.get_queue(account_id)
event = event["event"] event = event["event"]
@@ -172,10 +190,8 @@ class Rpc(BaseRpc):
The given arguments will be passed to subprocess.Popen(). The given arguments will be passed to subprocess.Popen().
""" """
super(Rpc, self).__init__() super(Rpc, self).__init__()
self.server_stdout: io.Writer[bytes]
self.server_stdin: io.Reader[bytes]
self._accounts_dir = accounts_dir self._accounts_dir = accounts_dir
self.rpc_server_path = rpc_server_path self.rpc_server_path: str = rpc_server_path
def connect_to_server(self): def connect_to_server(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
@@ -196,3 +212,21 @@ class Rpc(BaseRpc):
def disconnect_from_server(self): def disconnect_from_server(self):
self.stop_io_for_all_accounts() self.stop_io_for_all_accounts()
self.server_stdin.close() self.server_stdin.close()
class RpcFIFO(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""
def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
super(RpcFIFO, self).__init__()
self.fn_request_fifo = fn_request_fifo
self.fn_response_fifo = fn_response_fifo
def connect_to_server(self):
server_stdin = open(self.fn_request_fifo, "wb") # noqa
server_stdout = open(self.fn_response_fifo, "rb") # noqa
return server_stdout, server_stdin
def disconnect_from_server(self):
self.server_stdin.close()
self.server_stdout.close()

View File

@@ -0,0 +1,22 @@
import os
import platform # noqa
import subprocess
import pytest
from deltachat_rpc_client import DeltaChat, RpcFIFO
@pytest.mark.skipif("platform.system() == 'Windows'")
def test_rpc_fifo(tmp_path):
fn_request_fifo = tmp_path.joinpath("request_fifo")
fn_response_fifo = tmp_path.joinpath("response_fifo")
os.mkfifo(fn_request_fifo)
os.mkfifo(fn_response_fifo)
popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True)
rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo)
with rpc:
dc = DeltaChat(rpc)
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
popen.wait()