add python class RpcUnixSocket (does not completely work yet)

This commit is contained in:
Simon Laux
2026-01-12 16:06:19 +01:00
parent 3352b8d5f6
commit 998ab6e298
4 changed files with 316 additions and 1 deletions

View File

@@ -0,0 +1,258 @@
"""JSON-RPC client module."""
from __future__ import annotations
import itertools
import json
import logging
import os
import subprocess
import sys
import socket
from queue import Empty, Queue
from threading import Thread
from typing import TYPE_CHECKING, Any, Iterator, Optional
if TYPE_CHECKING:
import io
class JsonRpcError(Exception):
"""JSON-RPC error."""
class RpcShutdownError(JsonRpcError):
"""Raised in RPC methods if the connection to server is closing."""
class RpcMethod:
"""RPC method."""
def __init__(self, rpc: "BaseRpc", name: str):
self.rpc = rpc
self.name = name
def __call__(self, *args) -> Any:
"""Call JSON-RPC method synchronously."""
future = self.future(*args)
return future()
def future(self, *args) -> Any:
"""Call JSON-RPC method asynchronously."""
request_id = next(self.rpc.id_iterator)
request = {
"jsonrpc": "2.0",
"method": self.name,
"params": args,
"id": request_id,
}
self.rpc.request_results[request_id] = queue = Queue()
self.rpc.request_queue.put(request)
def rpc_future():
"""Wait for the request to receive a result."""
response = queue.get()
if response is None:
raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down")
if "error" in response:
raise JsonRpcError(response["error"])
return response.get("result", None)
return rpc_future
class BaseRpc:
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
from subclasses to work concretely."""
def __init__(self):
self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue]
# Map from request ID to a Queue which provides a single result
self.request_results: dict[int, Queue]
self.request_queue: Queue[Any]
self.server_stdin: io.Writer[bytes]
self.server_stdout: io.Reader[bytes]
self.closing: bool
self.reader_thread: Thread
self.writer_thread: Thread
self.events_thread: Thread
def start(self) -> None:
"""Start RPC server subprocess."""
self.server_stdout, self.server_stdin = self.connect_to_server()
self.id_iterator = itertools.count(start=1)
self.event_queues = {}
self.request_results = {}
self.request_queue = Queue()
self.closing = False
self.reader_thread = Thread(target=self.reader_loop)
self.reader_thread.start()
self.writer_thread = Thread(target=self.writer_loop)
self.writer_thread.start()
self.events_thread = Thread(target=self.events_loop)
self.events_thread.start()
def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True
self.disconnect_from_server()
self.reader_thread.join()
self.events_thread.join()
self.request_queue.put(None)
self.writer_thread.join()
def __enter__(self):
self.start()
return self
def __exit__(self, _exc_type, _exc, _tb):
self.close()
def reader_loop(self) -> None:
"""Process JSON-RPC responses from the RPC server process output."""
try:
while line := self.server_stdout.readline():
response = json.loads(line)
if "id" in response:
response_id = response["id"]
self.request_results.pop(response_id).put(response)
else:
logging.warning("Got a response without ID: %s", response)
except Exception:
# Log an exception if the reader loop dies.
logging.exception("Exception in the reader loop")
# terminate pending rpc requests because no responses can arrive anymore
for queue in self.request_results.values():
queue.put(None)
def writer_loop(self) -> None:
"""Writer loop ensuring only a single thread writes requests."""
try:
while request := self.request_queue.get():
data = (json.dumps(request) + "\n").encode()
self.server_stdin.write(data)
self.server_stdin.flush()
except Exception:
# Log an exception if the writer loop dies.
logging.exception("Exception in the writer loop")
def get_queue(self, account_id: int) -> Queue:
"""Get event queue corresponding to the given account ID."""
if account_id not in self.event_queues:
self.event_queues[account_id] = Queue()
return self.event_queues[account_id]
def events_loop(self) -> None:
"""Request new events and distributes them between queues."""
try:
while True:
if self.closing:
return
try:
event = self.get_next_event()
except RpcShutdownError:
return
account_id = event["contextId"]
queue = self.get_queue(account_id)
event = event["event"]
logging.debug("account_id=%d got an event %s", account_id, event)
queue.put(event)
except Exception:
# Log an exception if the event loop dies.
logging.exception("Exception in the event loop")
def wait_for_event(self, account_id: int) -> Optional[dict]:
"""Wait for the next event from the given account and returns it."""
queue = self.get_queue(account_id)
return queue.get()
def clear_all_events(self, account_id: int):
"""Remove all queued-up events for a given account. Useful for tests."""
queue = self.get_queue(account_id)
try:
while True:
queue.get_nowait()
except Empty:
pass
def __getattr__(self, attr: str):
return RpcMethod(self, attr)
class RpcSubprocess(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server in a subprocess."""
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"):
"""Initialize RPC client.
The given arguments will be passed to subprocess.Popen().
"""
super(RpcSubprocess, self).__init__()
self._accounts_dir = accounts_dir
self.rpc_server_path: str = rpc_server_path
def connect_to_server(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
if self._accounts_dir:
popen_kwargs["env"] = os.environ.copy()
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)
process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin
def disconnect_from_server(self):
self.stop_io_for_all_accounts()
self.server_stdin.close()
# backward compatibility
Rpc = RpcSubprocess
class RpcFIFO(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""
def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
super(RpcFIFO, self).__init__()
self.fn_request_fifo = fn_request_fifo
self.fn_response_fifo = fn_response_fifo
def connect_to_server(self):
server_stdin = open(self.fn_request_fifo, "wb") # noqa
server_stdout = open(self.fn_response_fifo, "rb") # noqa
return server_stdout, server_stdin
def disconnect_from_server(self):
self.server_stdin.close()
self.server_stdout.close()
class RpcUnixSocket(BaseRpc):
"""RPC client that connects to a deltachat-rpc-server through FIFO files."""
def __init__(self, socket_path: str):
super(RpcUnixSocket, self).__init__()
self.socket_path = socket_path
self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def connect_to_server(self):
print(str(self.socket_path))
self.client.connect(self.socket_path)
print("c1")
writer = self.client.makefile("wb")
reader = self.client.makefile("rb")
print("c2")
assert False
return reader, writer
def disconnect_from_server(self):
self.client.close()

View File

@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
from .contact import Contact from .contact import Contact
from .deltachat import DeltaChat from .deltachat import DeltaChat
from .message import Message from .message import Message
from .rpc import Rpc, RpcFIFO from .rpc import Rpc, RpcFIFO, RpcUnixSocket
__all__ = [ __all__ = [
"Account", "Account",
@@ -23,6 +23,7 @@ __all__ = [
"SpecialContactId", "SpecialContactId",
"Rpc", "Rpc",
"RpcFIFO", "RpcFIFO",
"RpcUnixSocket",
"run_bot_cli", "run_bot_cli",
"run_client_cli", "run_client_cli",
] ]

View File

@@ -8,6 +8,7 @@ import logging
import os import os
import subprocess import subprocess
import sys import sys
import socket
from queue import Empty, Queue from queue import Empty, Queue
from threading import Thread from threading import Thread
from typing import TYPE_CHECKING, Any, Iterator, Optional from typing import TYPE_CHECKING, Any, Iterator, Optional
@@ -234,3 +235,21 @@ class RpcFIFO(BaseRpc):
def disconnect_from_server(self): def disconnect_from_server(self):
self.server_stdin.close() self.server_stdin.close()
self.server_stdout.close() self.server_stdout.close()
class RpcUnixSocket(BaseRpc):
"""RPC client that connects to a deltachat-rpc-server through FIFO files."""
def __init__(self, socket_path: str):
super(RpcUnixSocket, self).__init__()
self.socket_path = socket_path
self.client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
def connect_to_server(self):
print(str(self.socket_path))
self.client.connect(self.socket_path)
writer = self.client.makefile("wb")
reader = self.client.makefile("rb")
return reader, writer
def disconnect_from_server(self):
self.client.close()

View File

@@ -0,0 +1,37 @@
from os import environ
import platform # noqa
import signal
import subprocess
from sys import stderr
from time import sleep
import pytest
from deltachat_rpc_client import DeltaChat, RpcUnixSocket
@pytest.mark.skipif("platform.system() == 'Windows'")
def test_rpc_unix(tmp_path):
socket_file = "/tmp/chatmail.sock" # path needs to be relative or short
path = environ.get("PATH")
assert path is not None
popen = subprocess.Popen(
f"deltachat-rpc-server --unix {socket_file}",
shell=True,
env=dict(
DC_ACCOUNTS_PATH=f"{tmp_path}/accounts/test",
rust_log="info",
PATH=path
)
)
sleep(1) # wait until socket exists # TODO this should not be needed
rpc = RpcUnixSocket(socket_path=socket_file)
with rpc:
dc = DeltaChat(rpc)
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
popen.send_signal(signal.SIGINT)
popen.wait()