diff --git a/deltachat-rpc-client/README.md b/deltachat-rpc-client/README.md index 5672dd807..e74076abf 100644 --- a/deltachat-rpc-client/README.md +++ b/deltachat-rpc-client/README.md @@ -2,6 +2,9 @@ RPC client connects to standalone Delta Chat RPC server `deltachat-rpc-server` and provides asynchronous interface to it. +`rpc.start()` blocks until the server is initialized +and will raise an error if initialization fails +(e.g. if the accounts directory could not be used). ## Getting started diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py index 1a532c701..03cf9b945 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py @@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId from .contact import Contact from .deltachat import DeltaChat from .message import Message -from .rpc import Rpc +from .rpc import JsonRpcError, Rpc __all__ = [ "Account", @@ -19,6 +19,7 @@ __all__ = [ "Contact", "DeltaChat", "EventType", + "JsonRpcError", "Message", "SpecialContactId", "Rpc", diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index 295a8ee39..86fe720e3 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -54,10 +54,19 @@ class RpcMethod: class Rpc: """RPC client.""" - def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs): + def __init__( + self, + accounts_dir: Optional[str] = None, + rpc_server_path="deltachat-rpc-server", + _skip_ready_check=False, + **kwargs, + ): """Initialize RPC client. The 'kwargs' arguments will be passed to subprocess.Popen(). + '_skip_ready_check' is for debugging/testing only, + e.g. when using an old server that doesn't send + ready/init_error notifications on startup. """ if accounts_dir: kwargs["env"] = { @@ -67,6 +76,7 @@ class Rpc: self._kwargs = kwargs self.rpc_server_path = rpc_server_path + self._skip_ready_check = _skip_ready_check self.process: subprocess.Popen self.id_iterator: Iterator[int] self.event_queues: dict[int, Queue] @@ -79,7 +89,13 @@ class Rpc: self.events_thread: Thread def start(self) -> None: - """Start RPC server subprocess.""" + """Start RPC server subprocess and wait for successful initialization. + + This method blocks until the RPC server sends a "ready" notification. + If the server fails to initialize + (e.g., due to an invalid accounts directory), + a JsonRpcError is raised with the error message provided by the server. + """ popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} if sys.version_info >= (3, 11): # Prevent subprocess from capturing SIGINT. @@ -90,6 +106,9 @@ class Rpc: popen_kwargs.update(self._kwargs) self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + + self._wait_for_ready() + self.id_iterator = itertools.count(start=1) self.event_queues = {} self.request_results = {} @@ -102,6 +121,44 @@ class Rpc: self.events_thread = Thread(target=self.events_loop) self.events_thread.start() + def _wait_for_ready(self) -> None: + """Wait for "ready" or "init_error" notification from the server.""" + if self._skip_ready_check: + return + + # Read the first JSON-RPC notification which is + # "ready" (success) or "init_error" (e.g. bad accounts dir). + line = self.process.stdout.readline() + if not line: + return_code = self.process.wait() + if return_code != 0: + raise JsonRpcError(f"RPC server terminated with exit code {return_code}") + return + + try: + status = json.loads(line) + except json.JSONDecodeError: + raise JsonRpcError(f"RPC server sent invalid initial message: {line.decode().strip()}") from None + + if status.get("method") == "init_error": + error_msg = status.get("params", ["Unknown error"])[0] + raise JsonRpcError(f"RPC server initialization failed: {error_msg}") + + if status.get("method") != "ready": + raise JsonRpcError(f"RPC server sent unexpected initial message: {line.decode().strip()}") + + params = status.get("params", [{}])[0] + core_version = params.get("core_version", "unknown") + server_path = params.get("server_path", "unknown") + accounts_dir = params.get("accounts_dir", "unknown") + logging.info( + "RPC server ready. Core version: {}, Server path: {}, Accounts dir: {}".format( + core_version, + server_path, + accounts_dir, + ), + ) + def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True diff --git a/deltachat-rpc-client/tests/test_cross_core.py b/deltachat-rpc-client/tests/test_cross_core.py index 31d39c790..30715fe21 100644 --- a/deltachat-rpc-client/tests/test_cross_core.py +++ b/deltachat-rpc-client/tests/test_cross_core.py @@ -8,7 +8,7 @@ from deltachat_rpc_client import DeltaChat, Rpc def test_install_venv_and_use_other_core(tmp_path, get_core_python_env): python, rpc_server_path = get_core_python_env("2.24.0") subprocess.check_call([python, "-m", "pip", "install", "deltachat-rpc-server==2.24.0"]) - rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path) + rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path, _skip_ready_check=True) with rpc: dc = DeltaChat(rpc) diff --git a/deltachat-rpc-client/tests/test_something.py b/deltachat-rpc-client/tests/test_something.py index f3f33cb15..44cba11aa 100644 --- a/deltachat-rpc-client/tests/test_something.py +++ b/deltachat-rpc-client/tests/test_something.py @@ -13,7 +13,7 @@ import pytest from deltachat_rpc_client import EventType, events from deltachat_rpc_client.const import DownloadState, MessageState from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS -from deltachat_rpc_client.rpc import JsonRpcError +from deltachat_rpc_client.rpc import JsonRpcError, Rpc def test_system_info(rpc) -> None: @@ -665,6 +665,24 @@ def test_openrpc_command_line() -> None: assert "methods" in openrpc +def test_early_failure(tmp_path) -> None: + """Test that Rpc.start() raises on invalid accounts directories.""" + # A file instead of a directory. + file_path = tmp_path / "not_a_dir" + file_path.write_text("I am a file, not a directory") + rpc = Rpc(accounts_dir=str(file_path)) + with pytest.raises(JsonRpcError, match="initialization failed"): + rpc.start() + + # A non-empty directory that is not a deltachat accounts directory. + non_dc_dir = tmp_path / "invalid_dir" + non_dc_dir.mkdir() + (non_dc_dir / "some_file").write_text("content") + rpc = Rpc(accounts_dir=str(non_dc_dir)) + with pytest.raises(JsonRpcError, match="initialization failed"): + rpc.start() + + def test_provider_info(rpc) -> None: account_id = rpc.add_account() diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 2db7f0535..31fcc86ef 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -72,14 +72,7 @@ async fn main_impl() -> Result<()> { #[cfg(target_family = "unix")] let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?; - let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); - log::info!("Starting with accounts directory `{path}`."); - let writable = true; - let accounts = Accounts::new(PathBuf::from(&path), writable).await?; - - log::info!("Creating JSON-RPC API."); - let accounts = Arc::new(RwLock::new(accounts)); - let state = CommandApi::from_arc(accounts.clone()).await; + let (accounts, state) = init_accounts_and_report_status().await?; let (client, mut out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), state.clone()); @@ -160,3 +153,41 @@ async fn main_impl() -> Result<()> { Ok(()) } + +async fn init_accounts_and_report_status() -> Result<(Arc>, CommandApi)> { + let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); + log::info!("Starting with accounts directory `{path}`."); + let path = PathBuf::from(&path); + match Accounts::new(path.clone(), true).await { + Ok(accounts) => { + log::info!("Creating JSON-RPC API."); + let accounts = Arc::new(RwLock::new(accounts)); + let state = CommandApi::from_arc(accounts.clone()).await; + println!( + "{}", + serde_json::to_string(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "ready", + "params": [{ + "core_version": DC_VERSION_STR, + "server_path": env::current_exe()?.display().to_string(), + "accounts_dir": path.display().to_string(), + }] + }))? + ); + Ok((accounts, state)) + } + Err(err) => { + let error_msg = format!("{err:#}"); + println!( + "{}", + serde_json::to_string(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "init_error", + "params": [error_msg] + }))? + ); + Err(err) + } + } +}