Compare commits

...

1 Commits

Author SHA1 Message Date
holger krekel
e88c7f88ee feat: report ready/init_error on startup via JSON-RPC notification
Why:
When the deltachat-rpc-server encounters a fatal error during early startup
(e.g., when the accounts directory is invalid, a file instead of a dir, or
otherwise inaccessible), it exits. The Python RPC client previously lacked
a structured way to wait for the server to be fully initialized or to
detect early startup failures gracefully. This led to hanging tests or
obscure broken pipe errors rather than clear initialization errors.

How:
- The RPC server now sends a JSON-RPC notification on stdout at startup:
  - "ready" with core_version, server_path, and accounts_dir on success
  - "init_error" with error message if accounts directory initialization fails
- The Python RPC client reads the first line from stdout to ensure the server is ready.
- The Python client raises JsonRpcError on init_error, enabling early
  failure detection and fast-failing rather than stalling.
- Added tests to ensure the client fails immediately on invalid dirs.
2026-02-24 17:30:07 +01:00
6 changed files with 123 additions and 13 deletions

View File

@@ -2,6 +2,9 @@
RPC client connects to standalone Delta Chat RPC server `deltachat-rpc-server`
and provides asynchronous interface to it.
`rpc.start()` blocks until the server is initialized
and will raise an error if initialization fails
(e.g. if the accounts directory could not be used).
## Getting started

View File

@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
from .contact import Contact
from .deltachat import DeltaChat
from .message import Message
from .rpc import Rpc
from .rpc import JsonRpcError, Rpc
__all__ = [
"Account",
@@ -19,6 +19,7 @@ __all__ = [
"Contact",
"DeltaChat",
"EventType",
"JsonRpcError",
"Message",
"SpecialContactId",
"Rpc",

View File

@@ -54,10 +54,19 @@ class RpcMethod:
class Rpc:
"""RPC client."""
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
def __init__(
self,
accounts_dir: Optional[str] = None,
rpc_server_path="deltachat-rpc-server",
_skip_ready_check=False,
**kwargs,
):
"""Initialize RPC client.
The 'kwargs' arguments will be passed to subprocess.Popen().
'_skip_ready_check' is for debugging/testing only,
e.g. when using an old server that doesn't send
ready/init_error notifications on startup.
"""
if accounts_dir:
kwargs["env"] = {
@@ -67,6 +76,7 @@ class Rpc:
self._kwargs = kwargs
self.rpc_server_path = rpc_server_path
self._skip_ready_check = _skip_ready_check
self.process: subprocess.Popen
self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue]
@@ -79,7 +89,13 @@ class Rpc:
self.events_thread: Thread
def start(self) -> None:
"""Start RPC server subprocess."""
"""Start RPC server subprocess and wait for successful initialization.
This method blocks until the RPC server sends a "ready" notification.
If the server fails to initialize
(e.g., due to an invalid accounts directory),
a JsonRpcError is raised with the error message provided by the server.
"""
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
@@ -90,6 +106,9 @@ class Rpc:
popen_kwargs.update(self._kwargs)
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
self._wait_for_ready()
self.id_iterator = itertools.count(start=1)
self.event_queues = {}
self.request_results = {}
@@ -102,6 +121,44 @@ class Rpc:
self.events_thread = Thread(target=self.events_loop)
self.events_thread.start()
def _wait_for_ready(self) -> None:
"""Wait for "ready" or "init_error" notification from the server."""
if self._skip_ready_check:
return
# Read the first JSON-RPC notification which is
# "ready" (success) or "init_error" (e.g. bad accounts dir).
line = self.process.stdout.readline()
if not line:
return_code = self.process.wait()
if return_code != 0:
raise JsonRpcError(f"RPC server terminated with exit code {return_code}")
return
try:
status = json.loads(line)
except json.JSONDecodeError:
raise JsonRpcError(f"RPC server sent invalid initial message: {line.decode().strip()}") from None
if status.get("method") == "init_error":
error_msg = status.get("params", ["Unknown error"])[0]
raise JsonRpcError(f"RPC server initialization failed: {error_msg}")
if status.get("method") != "ready":
raise JsonRpcError(f"RPC server sent unexpected initial message: {line.decode().strip()}")
params = status.get("params", [{}])[0]
core_version = params.get("core_version", "unknown")
server_path = params.get("server_path", "unknown")
accounts_dir = params.get("accounts_dir", "unknown")
logging.info(
"RPC server ready. Core version: {}, Server path: {}, Accounts dir: {}".format(
core_version,
server_path,
accounts_dir,
),
)
def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True

View File

@@ -8,7 +8,7 @@ from deltachat_rpc_client import DeltaChat, Rpc
def test_install_venv_and_use_other_core(tmp_path, get_core_python_env):
python, rpc_server_path = get_core_python_env("2.24.0")
subprocess.check_call([python, "-m", "pip", "install", "deltachat-rpc-server==2.24.0"])
rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path)
rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path, _skip_ready_check=True)
with rpc:
dc = DeltaChat(rpc)

View File

@@ -13,7 +13,7 @@ import pytest
from deltachat_rpc_client import EventType, events
from deltachat_rpc_client.const import DownloadState, MessageState
from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS
from deltachat_rpc_client.rpc import JsonRpcError
from deltachat_rpc_client.rpc import JsonRpcError, Rpc
def test_system_info(rpc) -> None:
@@ -665,6 +665,24 @@ def test_openrpc_command_line() -> None:
assert "methods" in openrpc
def test_early_failure(tmp_path) -> None:
"""Test that Rpc.start() raises on invalid accounts directories."""
# A file instead of a directory.
file_path = tmp_path / "not_a_dir"
file_path.write_text("I am a file, not a directory")
rpc = Rpc(accounts_dir=str(file_path))
with pytest.raises(JsonRpcError, match="initialization failed"):
rpc.start()
# A non-empty directory that is not a deltachat accounts directory.
non_dc_dir = tmp_path / "invalid_dir"
non_dc_dir.mkdir()
(non_dc_dir / "some_file").write_text("content")
rpc = Rpc(accounts_dir=str(non_dc_dir))
with pytest.raises(JsonRpcError, match="initialization failed"):
rpc.start()
def test_provider_info(rpc) -> None:
account_id = rpc.add_account()

View File

@@ -72,14 +72,7 @@ async fn main_impl() -> Result<()> {
#[cfg(target_family = "unix")]
let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?;
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
log::info!("Starting with accounts directory `{path}`.");
let writable = true;
let accounts = Accounts::new(PathBuf::from(&path), writable).await?;
log::info!("Creating JSON-RPC API.");
let accounts = Arc::new(RwLock::new(accounts));
let state = CommandApi::from_arc(accounts.clone()).await;
let (accounts, state) = init_accounts_and_report_status().await?;
let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state.clone());
@@ -160,3 +153,41 @@ async fn main_impl() -> Result<()> {
Ok(())
}
async fn init_accounts_and_report_status() -> Result<(Arc<RwLock<Accounts>>, CommandApi)> {
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
log::info!("Starting with accounts directory `{path}`.");
let path = PathBuf::from(&path);
match Accounts::new(path.clone(), true).await {
Ok(accounts) => {
log::info!("Creating JSON-RPC API.");
let accounts = Arc::new(RwLock::new(accounts));
let state = CommandApi::from_arc(accounts.clone()).await;
println!(
"{}",
serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"method": "ready",
"params": [{
"core_version": DC_VERSION_STR,
"server_path": env::current_exe()?.display().to_string(),
"accounts_dir": path.display().to_string(),
}]
}))?
);
Ok((accounts, state))
}
Err(err) => {
let error_msg = format!("{err:#}");
println!(
"{}",
serde_json::to_string(&serde_json::json!({
"jsonrpc": "2.0",
"method": "init_error",
"params": [error_msg]
}))?
);
Err(err)
}
}
}