diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index 76801e851..fe748fdab 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -10,7 +10,10 @@ import subprocess import sys from queue import Empty, Queue from threading import Thread -from typing import Any, Iterator, Optional +from typing import TYPE_CHECKING, Any, Iterator, Optional + +if TYPE_CHECKING: + import io class JsonRpcError(Exception): @@ -51,18 +54,8 @@ class RpcMethod: return rpc_future -class Rpc: - """RPC client.""" - - def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server"): - """Initialize RPC client. - - The given arguments will be passed to subprocess.Popen(). - """ - self._accounts_dir = accounts_dir - - self.rpc_server_path = rpc_server_path - self.process: subprocess.Popen +class BaseRpc: + def __init__(self): self.id_iterator: Iterator[int] self.event_queues: dict[int, Queue] # Map from request ID to a Queue which provides a single result @@ -75,7 +68,7 @@ class Rpc: def start(self) -> None: """Start RPC server subprocess.""" - self.server_stdout, self.server_stdin = self.connect_to_subprocess() + self.server_stdout, self.server_stdin = self.connect_to_server() self.id_iterator = itertools.count(start=1) self.event_queues = {} self.request_results = {} @@ -88,28 +81,11 @@ class Rpc: self.events_thread = Thread(target=self.events_loop) self.events_thread.start() - def connect_to_subprocess(self): - popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} - if sys.version_info >= (3, 11): - # Prevent subprocess from capturing SIGINT. - popen_kwargs["process_group"] = 0 - else: - # `process_group` is not supported before Python 3.11. - popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 - - if self._accounts_dir: - popen_kwargs["env"] = os.environ.copy() - popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir) - - process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) - return process.stdout, process.stdin - def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True - self.stop_io_for_all_accounts() + self.disconnect_from_server() self.events_thread.join() - self.server_stdin.close() self.reader_thread.join() self.request_queue.put(None) self.writer_thread.join() @@ -185,3 +161,38 @@ class Rpc: def __getattr__(self, attr: str): return RpcMethod(self, attr) + + +class Rpc(BaseRpc): + """RPC client that runs and connects to a deltachat-rpc-server in a subprocess.""" + + def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"): + """Initialize RPC client. + + The given arguments will be passed to subprocess.Popen(). + """ + super(Rpc, self).__init__() + self.server_stdout: io.Writer[bytes] + self.server_stdin: io.Reader[bytes] + self._accounts_dir = accounts_dir + self.rpc_server_path = rpc_server_path + + def connect_to_server(self): + popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} + if sys.version_info >= (3, 11): + # Prevent subprocess from capturing SIGINT. + popen_kwargs["process_group"] = 0 + else: + # `process_group` is not supported before Python 3.11. + popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 + + if self._accounts_dir: + popen_kwargs["env"] = os.environ.copy() + popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir) + + process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + return process.stdout, process.stdin + + def disconnect_from_server(self): + self.stop_io_for_all_accounts() + self.server_stdin.close()