move subprocess creation into own function

This commit is contained in:
holger krekel
2025-11-28 18:58:10 +01:00
parent e4e19b57b3
commit 8b9f5c7795

View File

@@ -80,16 +80,7 @@ class Rpc:
def start(self) -> None:
"""Start RPC server subprocess."""
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
popen_kwargs.update(self._kwargs)
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
self.server_stdout, self.server_stdin = self.connect_to_subprocess()
self.id_iterator = itertools.count(start=1)
self.event_queues = {}
self.request_results = {}
@@ -102,12 +93,25 @@ class Rpc:
self.events_thread = Thread(target=self.events_loop)
self.events_thread.start()
def connect_to_subprocess(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509
popen_kwargs.update(self._kwargs)
process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin
def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True
self.stop_io_for_all_accounts()
self.events_thread.join()
self.process.stdin.close()
self.server_stdin.close()
self.reader_thread.join()
self.request_queue.put(None)
self.writer_thread.join()
@@ -122,7 +126,7 @@ class Rpc:
def reader_loop(self) -> None:
"""Process JSON-RPC responses from the RPC server process output."""
try:
while line := self.process.stdout.readline():
while line := self.server_stdout.readline():
response = json.loads(line)
if "id" in response:
response_id = response["id"]
@@ -138,8 +142,8 @@ class Rpc:
try:
while request := self.request_queue.get():
data = (json.dumps(request) + "\n").encode()
self.process.stdin.write(data)
self.process.stdin.flush()
self.server_stdin.write(data)
self.server_stdin.flush()
except Exception:
# Log an exception if the writer loop dies.