Compare commits

...

6 Commits

Author SHA1 Message Date
holger krekel
51e58583e0 better naming 2025-11-30 14:52:01 +01:00
holger krekel
5542dcfa0a address link2xt feedback 2025-11-30 14:44:05 +01:00
holger krekel
00a6f41441 make fifo's work 2025-11-30 14:44:05 +01:00
holger krekel
a77e9cb8d5 separate out subprocess handling into subclass 2025-11-30 14:44:05 +01:00
holger krekel
7dc717fa62 strike unncessary kwargs to Rpc 2025-11-30 14:44:05 +01:00
holger krekel
8b9f5c7795 move subprocess creation into own function 2025-11-30 14:44:05 +01:00
4 changed files with 113 additions and 42 deletions

View File

@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
from .contact import Contact
from .deltachat import DeltaChat
from .message import Message
from .rpc import Rpc
from .rpc import Rpc, RpcFIFO
__all__ = [
"Account",
@@ -22,6 +22,7 @@ __all__ = [
"Message",
"SpecialContactId",
"Rpc",
"RpcFIFO",
"run_bot_cli",
"run_client_cli",
]

View File

@@ -47,7 +47,6 @@ class AttrDict(dict):
def run_client_cli(
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
) -> None:
"""Run a simple command line app, using the given hooks.
@@ -55,13 +54,12 @@ def run_client_cli(
"""
from .client import Client
_run_cli(Client, hooks, argv, **kwargs)
_run_cli(Client, hooks, argv)
def run_bot_cli(
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
) -> None:
"""Run a simple bot command line using the given hooks.
@@ -69,14 +67,13 @@ def run_bot_cli(
"""
from .client import Bot
_run_cli(Bot, hooks, argv, **kwargs)
_run_cli(Bot, hooks, argv)
def _run_cli(
client_type: Type["Client"],
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
) -> None:
from .deltachat import DeltaChat
from .rpc import Rpc
@@ -94,7 +91,7 @@ def _run_cli(
parser.add_argument("--password", action="store", help="password", default=os.getenv("DELTACHAT_PASSWORD"))
args = parser.parse_args(argv[1:])
with Rpc(accounts_dir=args.accounts_dir, **kwargs) as rpc:
with Rpc(accounts_dir=args.accounts_dir) as rpc:
deltachat = DeltaChat(rpc)
core_version = (deltachat.get_system_info()).deltachat_core_version
accounts = deltachat.get_all_accounts()

View File

@@ -10,17 +10,24 @@ import subprocess
import sys
from queue import Empty, Queue
from threading import Thread
from typing import Any, Iterator, Optional
from typing import TYPE_CHECKING, Any, Iterator, Optional
if TYPE_CHECKING:
import io
class JsonRpcError(Exception):
"""JSON-RPC error."""
class RpcShutdownError(JsonRpcError):
"""Raised in RPC methods if the connection to server is closing."""
class RpcMethod:
"""RPC method."""
def __init__(self, rpc: "Rpc", name: str):
def __init__(self, rpc: "BaseRpc", name: str):
self.rpc = rpc
self.name = name
@@ -44,6 +51,8 @@ class RpcMethod:
def rpc_future():
"""Wait for the request to receive a result."""
response = queue.get()
if response is None:
raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down")
if "error" in response:
raise JsonRpcError(response["error"])
return response.get("result", None)
@@ -51,28 +60,18 @@ class RpcMethod:
return rpc_future
class Rpc:
"""RPC client."""
class BaseRpc:
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
from subclasses to work concretely."""
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
"""Initialize RPC client.
The given arguments will be passed to subprocess.Popen().
"""
if accounts_dir:
kwargs["env"] = {
**kwargs.get("env", os.environ),
"DC_ACCOUNTS_PATH": str(accounts_dir),
}
self._kwargs = kwargs
self.rpc_server_path = rpc_server_path
self.process: subprocess.Popen
def __init__(self):
self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue]
# Map from request ID to a Queue which provides a single result
self.request_results: dict[int, Queue]
self.request_queue: Queue[Any]
self.server_stdin: io.Writer[bytes]
self.server_stdout: io.Reader[bytes]
self.closing: bool
self.reader_thread: Thread
self.writer_thread: Thread
@@ -80,16 +79,7 @@ class Rpc:
def start(self) -> None:
"""Start RPC server subprocess."""
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
popen_kwargs.update(self._kwargs)
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
self.server_stdout, self.server_stdin = self.connect_to_server()
self.id_iterator = itertools.count(start=1)
self.event_queues = {}
self.request_results = {}
@@ -105,10 +95,9 @@ class Rpc:
def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True
self.stop_io_for_all_accounts()
self.events_thread.join()
self.process.stdin.close()
self.disconnect_from_server()
self.reader_thread.join()
self.events_thread.join()
self.request_queue.put(None)
self.writer_thread.join()
@@ -122,7 +111,7 @@ class Rpc:
def reader_loop(self) -> None:
"""Process JSON-RPC responses from the RPC server process output."""
try:
while line := self.process.stdout.readline():
while line := self.server_stdout.readline():
response = json.loads(line)
if "id" in response:
response_id = response["id"]
@@ -133,13 +122,17 @@ class Rpc:
# Log an exception if the reader loop dies.
logging.exception("Exception in the reader loop")
# terminate pending rpc requests because no responses can arrive anymore
for queue in self.request_results.values():
queue.put(None)
def writer_loop(self) -> None:
"""Writer loop ensuring only a single thread writes requests."""
try:
while request := self.request_queue.get():
data = (json.dumps(request) + "\n").encode()
self.process.stdin.write(data)
self.process.stdin.flush()
self.server_stdin.write(data)
self.server_stdin.flush()
except Exception:
# Log an exception if the writer loop dies.
@@ -157,7 +150,10 @@ class Rpc:
while True:
if self.closing:
return
event = self.get_next_event()
try:
event = self.get_next_event()
except RpcShutdownError:
return
account_id = event["contextId"]
queue = self.get_queue(account_id)
event = event["event"]
@@ -183,3 +179,58 @@ class Rpc:
def __getattr__(self, attr: str):
return RpcMethod(self, attr)
class RpcSubprocess(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server in a subprocess."""
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"):
"""Initialize RPC client.
The given arguments will be passed to subprocess.Popen().
"""
super(RpcSubprocess, self).__init__()
self._accounts_dir = accounts_dir
self.rpc_server_path: str = rpc_server_path
def connect_to_server(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
if self._accounts_dir:
popen_kwargs["env"] = os.environ.copy()
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)
process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin
def disconnect_from_server(self):
self.stop_io_for_all_accounts()
self.server_stdin.close()
# backward compatibility
Rpc = RpcSubprocess
class RpcFIFO(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""
def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
super(RpcFIFO, self).__init__()
self.fn_request_fifo = fn_request_fifo
self.fn_response_fifo = fn_response_fifo
def connect_to_server(self):
server_stdin = open(self.fn_request_fifo, "wb") # noqa
server_stdout = open(self.fn_response_fifo, "rb") # noqa
return server_stdout, server_stdin
def disconnect_from_server(self):
self.server_stdin.close()
self.server_stdout.close()

View File

@@ -0,0 +1,22 @@
import os
import platform # noqa
import subprocess
import pytest
from deltachat_rpc_client import DeltaChat, RpcFIFO
@pytest.mark.skipif("platform.system() == 'Windows'")
def test_rpc_fifo(tmp_path):
fn_request_fifo = tmp_path.joinpath("request_fifo")
fn_response_fifo = tmp_path.joinpath("response_fifo")
os.mkfifo(fn_request_fifo)
os.mkfifo(fn_response_fifo)
popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True)
rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo)
with rpc:
dc = DeltaChat(rpc)
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
popen.wait()