mirror of
https://github.com/chatmail/core.git
synced 2026-04-28 02:46:29 +03:00
feat(rpc): add startup health-check and propagate server errors
Rpc.start() now calls get_system_info() after launching the server to verify it started successfully. If the server exits early (e.g. due to an invalid accounts directory), the core error message from stderr is captured and included in the raised JsonRpcError. The reader_loop now unblocks pending RPC requests when the server closes stdout, so callers never hang on a dead server. Export JsonRpcError from the deltachat_rpc_client package. Add test_early_failure verifying that Rpc.start() raises with the actual core error message for invalid accounts directories.
This commit is contained in:
@@ -2,6 +2,9 @@
|
||||
|
||||
RPC client connects to standalone Delta Chat RPC server `deltachat-rpc-server`
|
||||
and provides asynchronous interface to it.
|
||||
`rpc.start()` performs a health-check RPC call to verify the server
|
||||
started successfully and will raise an error if startup fails
|
||||
(e.g. if the accounts directory could not be used).
|
||||
|
||||
## Getting started
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
|
||||
from .contact import Contact
|
||||
from .deltachat import DeltaChat
|
||||
from .message import Message
|
||||
from .rpc import Rpc
|
||||
from .rpc import JsonRpcError, Rpc
|
||||
|
||||
__all__ = [
|
||||
"Account",
|
||||
@@ -19,6 +19,7 @@ __all__ = [
|
||||
"Contact",
|
||||
"DeltaChat",
|
||||
"EventType",
|
||||
"JsonRpcError",
|
||||
"Message",
|
||||
"SpecialContactId",
|
||||
"Rpc",
|
||||
|
||||
@@ -54,7 +54,12 @@ class RpcMethod:
|
||||
class Rpc:
|
||||
"""RPC client."""
|
||||
|
||||
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
accounts_dir: Optional[str] = None,
|
||||
rpc_server_path="deltachat-rpc-server",
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize RPC client.
|
||||
|
||||
The 'kwargs' arguments will be passed to subprocess.Popen().
|
||||
@@ -79,8 +84,15 @@ class Rpc:
|
||||
self.events_thread: Thread
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start RPC server subprocess."""
|
||||
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
|
||||
"""Start RPC server subprocess and wait for successful initialization.
|
||||
|
||||
This method blocks until the RPC server responds to an initial
|
||||
health-check RPC call (get_system_info).
|
||||
If the server fails to start
|
||||
(e.g., due to an invalid accounts directory),
|
||||
a JsonRpcError is raised.
|
||||
"""
|
||||
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE, "stderr": subprocess.PIPE}
|
||||
if sys.version_info >= (3, 11):
|
||||
# Prevent subprocess from capturing SIGINT.
|
||||
popen_kwargs["process_group"] = 0
|
||||
@@ -90,6 +102,7 @@ class Rpc:
|
||||
|
||||
popen_kwargs.update(self._kwargs)
|
||||
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
|
||||
|
||||
self.id_iterator = itertools.count(start=1)
|
||||
self.event_queues = {}
|
||||
self.request_results = {}
|
||||
@@ -102,6 +115,22 @@ class Rpc:
|
||||
self.events_thread = Thread(target=self.events_loop)
|
||||
self.events_thread.start()
|
||||
|
||||
# Perform a health-check RPC call to ensure the server started
|
||||
# successfully and the accounts directory is usable.
|
||||
try:
|
||||
system_info = self.get_system_info()
|
||||
except (JsonRpcError, Exception) as e:
|
||||
# The reader_loop already saw EOF on stdout, so the process
|
||||
# has exited and stderr is available.
|
||||
stderr = self.process.stderr.read().decode(errors="replace").strip()
|
||||
if stderr:
|
||||
raise JsonRpcError(f"RPC server failed to start: {stderr}") from e
|
||||
raise JsonRpcError(f"RPC server startup check failed: {e}") from e
|
||||
logging.info(
|
||||
"RPC server ready. Core version: %s",
|
||||
system_info.get("deltachat_core_version", "unknown"),
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Terminate RPC server process and wait until the reader loop finishes."""
|
||||
self.closing = True
|
||||
@@ -132,6 +161,10 @@ class Rpc:
|
||||
except Exception:
|
||||
# Log an exception if the reader loop dies.
|
||||
logging.exception("Exception in the reader loop")
|
||||
finally:
|
||||
# Unblock any pending requests when the server closes stdout.
|
||||
for _request_id, queue in self.request_results.items():
|
||||
queue.put({"error": {"code": -32000, "message": "RPC server closed"}})
|
||||
|
||||
def writer_loop(self) -> None:
|
||||
"""Writer loop ensuring only a single thread writes requests."""
|
||||
@@ -140,7 +173,6 @@ class Rpc:
|
||||
data = (json.dumps(request) + "\n").encode()
|
||||
self.process.stdin.write(data)
|
||||
self.process.stdin.flush()
|
||||
|
||||
except Exception:
|
||||
# Log an exception if the writer loop dies.
|
||||
logging.exception("Exception in the writer loop")
|
||||
|
||||
@@ -13,7 +13,7 @@ import pytest
|
||||
from deltachat_rpc_client import EventType, events
|
||||
from deltachat_rpc_client.const import DownloadState, MessageState
|
||||
from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS
|
||||
from deltachat_rpc_client.rpc import JsonRpcError
|
||||
from deltachat_rpc_client.rpc import JsonRpcError, Rpc
|
||||
|
||||
|
||||
def test_system_info(rpc) -> None:
|
||||
@@ -665,6 +665,24 @@ def test_openrpc_command_line() -> None:
|
||||
assert "methods" in openrpc
|
||||
|
||||
|
||||
def test_early_failure(tmp_path) -> None:
|
||||
"""Test that Rpc.start() raises on invalid accounts directories."""
|
||||
# A file instead of a directory.
|
||||
file_path = tmp_path / "not_a_dir"
|
||||
file_path.write_text("I am a file, not a directory")
|
||||
rpc = Rpc(accounts_dir=str(file_path))
|
||||
with pytest.raises(JsonRpcError, match="(?i)directory"):
|
||||
rpc.start()
|
||||
|
||||
# A non-empty directory that is not a deltachat accounts directory.
|
||||
non_dc_dir = tmp_path / "invalid_dir"
|
||||
non_dc_dir.mkdir()
|
||||
(non_dc_dir / "some_file").write_text("content")
|
||||
rpc = Rpc(accounts_dir=str(non_dc_dir))
|
||||
with pytest.raises(JsonRpcError, match="invalid_dir"):
|
||||
rpc.start()
|
||||
|
||||
|
||||
def test_provider_info(rpc) -> None:
|
||||
account_id = rpc.add_account()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user