separate out subprocess handling into subclass

This commit is contained in:
holger krekel
2025-11-28 19:49:05 +01:00
parent 7dc717fa62
commit a77e9cb8d5

View File

@@ -10,7 +10,10 @@ import subprocess
import sys import sys
from queue import Empty, Queue from queue import Empty, Queue
from threading import Thread from threading import Thread
from typing import Any, Iterator, Optional from typing import TYPE_CHECKING, Any, Iterator, Optional
if TYPE_CHECKING:
import io
class JsonRpcError(Exception): class JsonRpcError(Exception):
@@ -51,18 +54,8 @@ class RpcMethod:
return rpc_future return rpc_future
class Rpc: class BaseRpc:
"""RPC client.""" def __init__(self):
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server"):
"""Initialize RPC client.
The given arguments will be passed to subprocess.Popen().
"""
self._accounts_dir = accounts_dir
self.rpc_server_path = rpc_server_path
self.process: subprocess.Popen
self.id_iterator: Iterator[int] self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue] self.event_queues: dict[int, Queue]
# Map from request ID to a Queue which provides a single result # Map from request ID to a Queue which provides a single result
@@ -75,7 +68,7 @@ class Rpc:
def start(self) -> None: def start(self) -> None:
"""Start RPC server subprocess.""" """Start RPC server subprocess."""
self.server_stdout, self.server_stdin = self.connect_to_subprocess() self.server_stdout, self.server_stdin = self.connect_to_server()
self.id_iterator = itertools.count(start=1) self.id_iterator = itertools.count(start=1)
self.event_queues = {} self.event_queues = {}
self.request_results = {} self.request_results = {}
@@ -88,28 +81,11 @@ class Rpc:
self.events_thread = Thread(target=self.events_loop) self.events_thread = Thread(target=self.events_loop)
self.events_thread.start() self.events_thread.start()
def connect_to_subprocess(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
if self._accounts_dir:
popen_kwargs["env"] = os.environ.copy()
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)
process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin
def close(self) -> None: def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes.""" """Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True self.closing = True
self.stop_io_for_all_accounts() self.disconnect_from_server()
self.events_thread.join() self.events_thread.join()
self.server_stdin.close()
self.reader_thread.join() self.reader_thread.join()
self.request_queue.put(None) self.request_queue.put(None)
self.writer_thread.join() self.writer_thread.join()
@@ -185,3 +161,38 @@ class Rpc:
def __getattr__(self, attr: str): def __getattr__(self, attr: str):
return RpcMethod(self, attr) return RpcMethod(self, attr)
class Rpc(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server in a subprocess."""
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"):
"""Initialize RPC client.
The given arguments will be passed to subprocess.Popen().
"""
super(Rpc, self).__init__()
self.server_stdout: io.Writer[bytes]
self.server_stdin: io.Reader[bytes]
self._accounts_dir = accounts_dir
self.rpc_server_path = rpc_server_path
def connect_to_server(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
if self._accounts_dir:
popen_kwargs["env"] = os.environ.copy()
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)
process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin
def disconnect_from_server(self):
self.stop_io_for_all_accounts()
self.server_stdin.close()