From e88c7f88eefbdca3cbf583b4f4d50add074b338f Mon Sep 17 00:00:00 2001 From: holger krekel Date: Tue, 24 Feb 2026 12:00:12 +0100 Subject: [PATCH] feat: report ready/init_error on startup via JSON-RPC notification Why: When the deltachat-rpc-server encounters a fatal error during early startup (e.g., when the accounts directory is invalid, a file instead of a dir, or otherwise inaccessible), it exits. The Python RPC client previously lacked a structured way to wait for the server to be fully initialized or to detect early startup failures gracefully. This led to hanging tests or obscure broken pipe errors rather than clear initialization errors. How: - The RPC server now sends a JSON-RPC notification on stdout at startup: - "ready" with core_version, server_path, and accounts_dir on success - "init_error" with error message if accounts directory initialization fails - The Python RPC client reads the first line from stdout to ensure the server is ready. - The Python client raises JsonRpcError on init_error, enabling early failure detection and fast-failing rather than stalling. - Added tests to ensure the client fails immediately on invalid dirs. --- deltachat-rpc-client/README.md | 3 + .../src/deltachat_rpc_client/__init__.py | 3 +- .../src/deltachat_rpc_client/rpc.py | 61 ++++++++++++++++++- deltachat-rpc-client/tests/test_cross_core.py | 2 +- deltachat-rpc-client/tests/test_something.py | 20 +++++- deltachat-rpc-server/src/main.rs | 47 +++++++++++--- 6 files changed, 123 insertions(+), 13 deletions(-) diff --git a/deltachat-rpc-client/README.md b/deltachat-rpc-client/README.md index 5672dd807..e74076abf 100644 --- a/deltachat-rpc-client/README.md +++ b/deltachat-rpc-client/README.md @@ -2,6 +2,9 @@ RPC client connects to standalone Delta Chat RPC server `deltachat-rpc-server` and provides asynchronous interface to it. +`rpc.start()` blocks until the server is initialized +and will raise an error if initialization fails +(e.g. if the accounts directory could not be used). ## Getting started diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py index 1a532c701..03cf9b945 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py @@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId from .contact import Contact from .deltachat import DeltaChat from .message import Message -from .rpc import Rpc +from .rpc import JsonRpcError, Rpc __all__ = [ "Account", @@ -19,6 +19,7 @@ __all__ = [ "Contact", "DeltaChat", "EventType", + "JsonRpcError", "Message", "SpecialContactId", "Rpc", diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index 295a8ee39..86fe720e3 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -54,10 +54,19 @@ class RpcMethod: class Rpc: """RPC client.""" - def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs): + def __init__( + self, + accounts_dir: Optional[str] = None, + rpc_server_path="deltachat-rpc-server", + _skip_ready_check=False, + **kwargs, + ): """Initialize RPC client. The 'kwargs' arguments will be passed to subprocess.Popen(). + '_skip_ready_check' is for debugging/testing only, + e.g. when using an old server that doesn't send + ready/init_error notifications on startup. """ if accounts_dir: kwargs["env"] = { @@ -67,6 +76,7 @@ class Rpc: self._kwargs = kwargs self.rpc_server_path = rpc_server_path + self._skip_ready_check = _skip_ready_check self.process: subprocess.Popen self.id_iterator: Iterator[int] self.event_queues: dict[int, Queue] @@ -79,7 +89,13 @@ class Rpc: self.events_thread: Thread def start(self) -> None: - """Start RPC server subprocess.""" + """Start RPC server subprocess and wait for successful initialization. + + This method blocks until the RPC server sends a "ready" notification. + If the server fails to initialize + (e.g., due to an invalid accounts directory), + a JsonRpcError is raised with the error message provided by the server. + """ popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} if sys.version_info >= (3, 11): # Prevent subprocess from capturing SIGINT. @@ -90,6 +106,9 @@ class Rpc: popen_kwargs.update(self._kwargs) self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + + self._wait_for_ready() + self.id_iterator = itertools.count(start=1) self.event_queues = {} self.request_results = {} @@ -102,6 +121,44 @@ class Rpc: self.events_thread = Thread(target=self.events_loop) self.events_thread.start() + def _wait_for_ready(self) -> None: + """Wait for "ready" or "init_error" notification from the server.""" + if self._skip_ready_check: + return + + # Read the first JSON-RPC notification which is + # "ready" (success) or "init_error" (e.g. bad accounts dir). + line = self.process.stdout.readline() + if not line: + return_code = self.process.wait() + if return_code != 0: + raise JsonRpcError(f"RPC server terminated with exit code {return_code}") + return + + try: + status = json.loads(line) + except json.JSONDecodeError: + raise JsonRpcError(f"RPC server sent invalid initial message: {line.decode().strip()}") from None + + if status.get("method") == "init_error": + error_msg = status.get("params", ["Unknown error"])[0] + raise JsonRpcError(f"RPC server initialization failed: {error_msg}") + + if status.get("method") != "ready": + raise JsonRpcError(f"RPC server sent unexpected initial message: {line.decode().strip()}") + + params = status.get("params", [{}])[0] + core_version = params.get("core_version", "unknown") + server_path = params.get("server_path", "unknown") + accounts_dir = params.get("accounts_dir", "unknown") + logging.info( + "RPC server ready. Core version: {}, Server path: {}, Accounts dir: {}".format( + core_version, + server_path, + accounts_dir, + ), + ) + def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True diff --git a/deltachat-rpc-client/tests/test_cross_core.py b/deltachat-rpc-client/tests/test_cross_core.py index 31d39c790..30715fe21 100644 --- a/deltachat-rpc-client/tests/test_cross_core.py +++ b/deltachat-rpc-client/tests/test_cross_core.py @@ -8,7 +8,7 @@ from deltachat_rpc_client import DeltaChat, Rpc def test_install_venv_and_use_other_core(tmp_path, get_core_python_env): python, rpc_server_path = get_core_python_env("2.24.0") subprocess.check_call([python, "-m", "pip", "install", "deltachat-rpc-server==2.24.0"]) - rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path) + rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path, _skip_ready_check=True) with rpc: dc = DeltaChat(rpc) diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index f3f33cb15..44cba11aa 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -13,7 +13,7 @@ import pytest from deltachat_rpc_client import EventType, events from deltachat_rpc_client.const import DownloadState, MessageState from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS -from deltachat_rpc_client.rpc import JsonRpcError +from deltachat_rpc_client.rpc import JsonRpcError, Rpc def test_system_info(rpc) -> None: @@ -665,6 +665,24 @@ def test_openrpc_command_line() -> None: assert "methods" in openrpc +def test_early_failure(tmp_path) -> None: + """Test that Rpc.start() raises on invalid accounts directories.""" + # A file instead of a directory. + file_path = tmp_path / "not_a_dir" + file_path.write_text("I am a file, not a directory") + rpc = Rpc(accounts_dir=str(file_path)) + with pytest.raises(JsonRpcError, match="initialization failed"): + rpc.start() + + # A non-empty directory that is not a deltachat accounts directory. + non_dc_dir = tmp_path / "invalid_dir" + non_dc_dir.mkdir() + (non_dc_dir / "some_file").write_text("content") + rpc = Rpc(accounts_dir=str(non_dc_dir)) + with pytest.raises(JsonRpcError, match="initialization failed"): + rpc.start() + + def test_provider_info(rpc) -> None: account_id = rpc.add_account() diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 2db7f0535..31fcc86ef 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -72,14 +72,7 @@ async fn main_impl() -> Result<()> { #[cfg(target_family = "unix")] let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?; - let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); - log::info!("Starting with accounts directory `{path}`."); - let writable = true; - let accounts = Accounts::new(PathBuf::from(&path), writable).await?; - - log::info!("Creating JSON-RPC API."); - let accounts = Arc::new(RwLock::new(accounts)); - let state = CommandApi::from_arc(accounts.clone()).await; + let (accounts, state) = init_accounts_and_report_status().await?; let (client, mut out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), state.clone()); @@ -160,3 +153,41 @@ async fn main_impl() -> Result<()> { Ok(()) } + +async fn init_accounts_and_report_status() -> Result<(Arc>, CommandApi)> { + let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); + log::info!("Starting with accounts directory `{path}`."); + let path = PathBuf::from(&path); + match Accounts::new(path.clone(), true).await { + Ok(accounts) => { + log::info!("Creating JSON-RPC API."); + let accounts = Arc::new(RwLock::new(accounts)); + let state = CommandApi::from_arc(accounts.clone()).await; + println!( + "{}", + serde_json::to_string(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "ready", + "params": [{ + "core_version": DC_VERSION_STR, + "server_path": env::current_exe()?.display().to_string(), + "accounts_dir": path.display().to_string(), + }] + }))? + ); + Ok((accounts, state)) + } + Err(err) => { + let error_msg = format!("{err:#}"); + println!( + "{}", + serde_json::to_string(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "init_error", + "params": [error_msg] + }))? + ); + Err(err) + } + } +}