mirror of
https://github.com/chatmail/core.git
synced 2026-04-17 13:36:30 +03:00
feat: report ready/init_error on startup via JSON-RPC notification
Why: When the deltachat-rpc-server encounters a fatal error during early startup (e.g., when the accounts directory is invalid, a file instead of a dir, or otherwise inaccessible), it exits. The Python RPC client previously lacked a structured way to wait for the server to be fully initialized or to detect early startup failures gracefully. This led to hanging tests or obscure broken pipe errors rather than clear initialization errors. How: - The RPC server now sends a JSON-RPC notification on stdout at startup: - "ready" with core_version, server_path, and accounts_dir on success - "init_error" with error message if accounts directory initialization fails - The Python RPC client reads the first line from stdout to ensure the server is ready. - The Python client raises JsonRpcError on init_error, enabling early failure detection and fast-failing rather than stalling. - Added tests to ensure the client fails immediately on invalid dirs.
This commit is contained in:
@@ -2,6 +2,9 @@
|
||||
|
||||
RPC client connects to standalone Delta Chat RPC server `deltachat-rpc-server`
|
||||
and provides asynchronous interface to it.
|
||||
`rpc.start()` blocks until the server is initialized
|
||||
and will raise an error if initialization fails
|
||||
(e.g. if the accounts directory could not be used).
|
||||
|
||||
## Getting started
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
|
||||
from .contact import Contact
|
||||
from .deltachat import DeltaChat
|
||||
from .message import Message
|
||||
from .rpc import Rpc
|
||||
from .rpc import JsonRpcError, Rpc
|
||||
|
||||
__all__ = [
|
||||
"Account",
|
||||
@@ -19,6 +19,7 @@ __all__ = [
|
||||
"Contact",
|
||||
"DeltaChat",
|
||||
"EventType",
|
||||
"JsonRpcError",
|
||||
"Message",
|
||||
"SpecialContactId",
|
||||
"Rpc",
|
||||
|
||||
@@ -54,10 +54,19 @@ class RpcMethod:
|
||||
class Rpc:
|
||||
"""RPC client."""
|
||||
|
||||
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
accounts_dir: Optional[str] = None,
|
||||
rpc_server_path="deltachat-rpc-server",
|
||||
_skip_ready_check=False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize RPC client.
|
||||
|
||||
The 'kwargs' arguments will be passed to subprocess.Popen().
|
||||
'_skip_ready_check' is for debugging/testing only,
|
||||
e.g. when using an old server that doesn't send
|
||||
ready/init_error notifications on startup.
|
||||
"""
|
||||
if accounts_dir:
|
||||
kwargs["env"] = {
|
||||
@@ -67,6 +76,7 @@ class Rpc:
|
||||
|
||||
self._kwargs = kwargs
|
||||
self.rpc_server_path = rpc_server_path
|
||||
self._skip_ready_check = _skip_ready_check
|
||||
self.process: subprocess.Popen
|
||||
self.id_iterator: Iterator[int]
|
||||
self.event_queues: dict[int, Queue]
|
||||
@@ -79,7 +89,13 @@ class Rpc:
|
||||
self.events_thread: Thread
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start RPC server subprocess."""
|
||||
"""Start RPC server subprocess and wait for successful initialization.
|
||||
|
||||
This method blocks until the RPC server sends a "ready" notification.
|
||||
If the server fails to initialize
|
||||
(e.g., due to an invalid accounts directory),
|
||||
a JsonRpcError is raised with the error message provided by the server.
|
||||
"""
|
||||
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
|
||||
if sys.version_info >= (3, 11):
|
||||
# Prevent subprocess from capturing SIGINT.
|
||||
@@ -90,6 +106,9 @@ class Rpc:
|
||||
|
||||
popen_kwargs.update(self._kwargs)
|
||||
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
|
||||
|
||||
self._wait_for_ready()
|
||||
|
||||
self.id_iterator = itertools.count(start=1)
|
||||
self.event_queues = {}
|
||||
self.request_results = {}
|
||||
@@ -102,6 +121,44 @@ class Rpc:
|
||||
self.events_thread = Thread(target=self.events_loop)
|
||||
self.events_thread.start()
|
||||
|
||||
def _wait_for_ready(self) -> None:
|
||||
"""Wait for "ready" or "init_error" notification from the server."""
|
||||
if self._skip_ready_check:
|
||||
return
|
||||
|
||||
# Read the first JSON-RPC notification which is
|
||||
# "ready" (success) or "init_error" (e.g. bad accounts dir).
|
||||
line = self.process.stdout.readline()
|
||||
if not line:
|
||||
return_code = self.process.wait()
|
||||
if return_code != 0:
|
||||
raise JsonRpcError(f"RPC server terminated with exit code {return_code}")
|
||||
return
|
||||
|
||||
try:
|
||||
status = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
raise JsonRpcError(f"RPC server sent invalid initial message: {line.decode().strip()}") from None
|
||||
|
||||
if status.get("method") == "init_error":
|
||||
error_msg = status.get("params", ["Unknown error"])[0]
|
||||
raise JsonRpcError(f"RPC server initialization failed: {error_msg}")
|
||||
|
||||
if status.get("method") != "ready":
|
||||
raise JsonRpcError(f"RPC server sent unexpected initial message: {line.decode().strip()}")
|
||||
|
||||
params = status.get("params", [{}])[0]
|
||||
core_version = params.get("core_version", "unknown")
|
||||
server_path = params.get("server_path", "unknown")
|
||||
accounts_dir = params.get("accounts_dir", "unknown")
|
||||
logging.info(
|
||||
"RPC server ready. Core version: {}, Server path: {}, Accounts dir: {}".format(
|
||||
core_version,
|
||||
server_path,
|
||||
accounts_dir,
|
||||
),
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Terminate RPC server process and wait until the reader loop finishes."""
|
||||
self.closing = True
|
||||
|
||||
@@ -8,7 +8,7 @@ from deltachat_rpc_client import DeltaChat, Rpc
|
||||
def test_install_venv_and_use_other_core(tmp_path, get_core_python_env):
|
||||
python, rpc_server_path = get_core_python_env("2.24.0")
|
||||
subprocess.check_call([python, "-m", "pip", "install", "deltachat-rpc-server==2.24.0"])
|
||||
rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path)
|
||||
rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path, _skip_ready_check=True)
|
||||
|
||||
with rpc:
|
||||
dc = DeltaChat(rpc)
|
||||
|
||||
@@ -13,7 +13,7 @@ import pytest
|
||||
from deltachat_rpc_client import EventType, events
|
||||
from deltachat_rpc_client.const import DownloadState, MessageState
|
||||
from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS
|
||||
from deltachat_rpc_client.rpc import JsonRpcError
|
||||
from deltachat_rpc_client.rpc import JsonRpcError, Rpc
|
||||
|
||||
|
||||
def test_system_info(rpc) -> None:
|
||||
@@ -665,6 +665,24 @@ def test_openrpc_command_line() -> None:
|
||||
assert "methods" in openrpc
|
||||
|
||||
|
||||
def test_early_failure(tmp_path) -> None:
|
||||
"""Test that Rpc.start() raises on invalid accounts directories."""
|
||||
# A file instead of a directory.
|
||||
file_path = tmp_path / "not_a_dir"
|
||||
file_path.write_text("I am a file, not a directory")
|
||||
rpc = Rpc(accounts_dir=str(file_path))
|
||||
with pytest.raises(JsonRpcError, match="initialization failed"):
|
||||
rpc.start()
|
||||
|
||||
# A non-empty directory that is not a deltachat accounts directory.
|
||||
non_dc_dir = tmp_path / "invalid_dir"
|
||||
non_dc_dir.mkdir()
|
||||
(non_dc_dir / "some_file").write_text("content")
|
||||
rpc = Rpc(accounts_dir=str(non_dc_dir))
|
||||
with pytest.raises(JsonRpcError, match="initialization failed"):
|
||||
rpc.start()
|
||||
|
||||
|
||||
def test_provider_info(rpc) -> None:
|
||||
account_id = rpc.add_account()
|
||||
|
||||
|
||||
@@ -72,14 +72,7 @@ async fn main_impl() -> Result<()> {
|
||||
#[cfg(target_family = "unix")]
|
||||
let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?;
|
||||
|
||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||
log::info!("Starting with accounts directory `{path}`.");
|
||||
let writable = true;
|
||||
let accounts = Accounts::new(PathBuf::from(&path), writable).await?;
|
||||
|
||||
log::info!("Creating JSON-RPC API.");
|
||||
let accounts = Arc::new(RwLock::new(accounts));
|
||||
let state = CommandApi::from_arc(accounts.clone()).await;
|
||||
let (accounts, state) = init_accounts_and_report_status().await?;
|
||||
|
||||
let (client, mut out_receiver) = RpcClient::new();
|
||||
let session = RpcSession::new(client.clone(), state.clone());
|
||||
@@ -160,3 +153,41 @@ async fn main_impl() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn init_accounts_and_report_status() -> Result<(Arc<RwLock<Accounts>>, CommandApi)> {
|
||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||
log::info!("Starting with accounts directory `{path}`.");
|
||||
let path = PathBuf::from(&path);
|
||||
match Accounts::new(path.clone(), true).await {
|
||||
Ok(accounts) => {
|
||||
log::info!("Creating JSON-RPC API.");
|
||||
let accounts = Arc::new(RwLock::new(accounts));
|
||||
let state = CommandApi::from_arc(accounts.clone()).await;
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "ready",
|
||||
"params": [{
|
||||
"core_version": DC_VERSION_STR,
|
||||
"server_path": env::current_exe()?.display().to_string(),
|
||||
"accounts_dir": path.display().to_string(),
|
||||
}]
|
||||
}))?
|
||||
);
|
||||
Ok((accounts, state))
|
||||
}
|
||||
Err(err) => {
|
||||
let error_msg = format!("{err:#}");
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "init_error",
|
||||
"params": [error_msg]
|
||||
}))?
|
||||
);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user