mirror of
https://github.com/chatmail/core.git
synced 2026-04-02 05:22:14 +03:00
Compare commits
1 Commits
d6dacdcd27
...
hpk/robust
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e88c7f88ee |
@@ -2,6 +2,9 @@
|
||||
|
||||
RPC client connects to standalone Delta Chat RPC server `deltachat-rpc-server`
|
||||
and provides asynchronous interface to it.
|
||||
`rpc.start()` blocks until the server is initialized
|
||||
and will raise an error if initialization fails
|
||||
(e.g. if the accounts directory could not be used).
|
||||
|
||||
## Getting started
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ from .const import EventType, SpecialContactId
|
||||
from .contact import Contact
|
||||
from .deltachat import DeltaChat
|
||||
from .message import Message
|
||||
from .rpc import Rpc
|
||||
from .rpc import JsonRpcError, Rpc
|
||||
|
||||
__all__ = [
|
||||
"Account",
|
||||
@@ -19,6 +19,7 @@ __all__ = [
|
||||
"Contact",
|
||||
"DeltaChat",
|
||||
"EventType",
|
||||
"JsonRpcError",
|
||||
"Message",
|
||||
"SpecialContactId",
|
||||
"Rpc",
|
||||
|
||||
@@ -54,10 +54,19 @@ class RpcMethod:
|
||||
class Rpc:
|
||||
"""RPC client."""
|
||||
|
||||
def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
accounts_dir: Optional[str] = None,
|
||||
rpc_server_path="deltachat-rpc-server",
|
||||
_skip_ready_check=False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Initialize RPC client.
|
||||
|
||||
The 'kwargs' arguments will be passed to subprocess.Popen().
|
||||
'_skip_ready_check' is for debugging/testing only,
|
||||
e.g. when using an old server that doesn't send
|
||||
ready/init_error notifications on startup.
|
||||
"""
|
||||
if accounts_dir:
|
||||
kwargs["env"] = {
|
||||
@@ -67,6 +76,7 @@ class Rpc:
|
||||
|
||||
self._kwargs = kwargs
|
||||
self.rpc_server_path = rpc_server_path
|
||||
self._skip_ready_check = _skip_ready_check
|
||||
self.process: subprocess.Popen
|
||||
self.id_iterator: Iterator[int]
|
||||
self.event_queues: dict[int, Queue]
|
||||
@@ -79,7 +89,13 @@ class Rpc:
|
||||
self.events_thread: Thread
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start RPC server subprocess."""
|
||||
"""Start RPC server subprocess and wait for successful initialization.
|
||||
|
||||
This method blocks until the RPC server sends a "ready" notification.
|
||||
If the server fails to initialize
|
||||
(e.g., due to an invalid accounts directory),
|
||||
a JsonRpcError is raised with the error message provided by the server.
|
||||
"""
|
||||
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
|
||||
if sys.version_info >= (3, 11):
|
||||
# Prevent subprocess from capturing SIGINT.
|
||||
@@ -90,6 +106,9 @@ class Rpc:
|
||||
|
||||
popen_kwargs.update(self._kwargs)
|
||||
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
|
||||
|
||||
self._wait_for_ready()
|
||||
|
||||
self.id_iterator = itertools.count(start=1)
|
||||
self.event_queues = {}
|
||||
self.request_results = {}
|
||||
@@ -102,6 +121,44 @@ class Rpc:
|
||||
self.events_thread = Thread(target=self.events_loop)
|
||||
self.events_thread.start()
|
||||
|
||||
def _wait_for_ready(self) -> None:
|
||||
"""Wait for "ready" or "init_error" notification from the server."""
|
||||
if self._skip_ready_check:
|
||||
return
|
||||
|
||||
# Read the first JSON-RPC notification which is
|
||||
# "ready" (success) or "init_error" (e.g. bad accounts dir).
|
||||
line = self.process.stdout.readline()
|
||||
if not line:
|
||||
return_code = self.process.wait()
|
||||
if return_code != 0:
|
||||
raise JsonRpcError(f"RPC server terminated with exit code {return_code}")
|
||||
return
|
||||
|
||||
try:
|
||||
status = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
raise JsonRpcError(f"RPC server sent invalid initial message: {line.decode().strip()}") from None
|
||||
|
||||
if status.get("method") == "init_error":
|
||||
error_msg = status.get("params", ["Unknown error"])[0]
|
||||
raise JsonRpcError(f"RPC server initialization failed: {error_msg}")
|
||||
|
||||
if status.get("method") != "ready":
|
||||
raise JsonRpcError(f"RPC server sent unexpected initial message: {line.decode().strip()}")
|
||||
|
||||
params = status.get("params", [{}])[0]
|
||||
core_version = params.get("core_version", "unknown")
|
||||
server_path = params.get("server_path", "unknown")
|
||||
accounts_dir = params.get("accounts_dir", "unknown")
|
||||
logging.info(
|
||||
"RPC server ready. Core version: {}, Server path: {}, Accounts dir: {}".format(
|
||||
core_version,
|
||||
server_path,
|
||||
accounts_dir,
|
||||
),
|
||||
)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Terminate RPC server process and wait until the reader loop finishes."""
|
||||
self.closing = True
|
||||
|
||||
@@ -8,7 +8,7 @@ from deltachat_rpc_client import DeltaChat, Rpc
|
||||
def test_install_venv_and_use_other_core(tmp_path, get_core_python_env):
|
||||
python, rpc_server_path = get_core_python_env("2.24.0")
|
||||
subprocess.check_call([python, "-m", "pip", "install", "deltachat-rpc-server==2.24.0"])
|
||||
rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path)
|
||||
rpc = Rpc(accounts_dir=tmp_path.joinpath("accounts"), rpc_server_path=rpc_server_path, _skip_ready_check=True)
|
||||
|
||||
with rpc:
|
||||
dc = DeltaChat(rpc)
|
||||
|
||||
@@ -13,7 +13,7 @@ import pytest
|
||||
from deltachat_rpc_client import EventType, events
|
||||
from deltachat_rpc_client.const import DownloadState, MessageState
|
||||
from deltachat_rpc_client.pytestplugin import E2EE_INFO_MSGS
|
||||
from deltachat_rpc_client.rpc import JsonRpcError
|
||||
from deltachat_rpc_client.rpc import JsonRpcError, Rpc
|
||||
|
||||
|
||||
def test_system_info(rpc) -> None:
|
||||
@@ -665,6 +665,24 @@ def test_openrpc_command_line() -> None:
|
||||
assert "methods" in openrpc
|
||||
|
||||
|
||||
def test_early_failure(tmp_path) -> None:
|
||||
"""Test that Rpc.start() raises on invalid accounts directories."""
|
||||
# A file instead of a directory.
|
||||
file_path = tmp_path / "not_a_dir"
|
||||
file_path.write_text("I am a file, not a directory")
|
||||
rpc = Rpc(accounts_dir=str(file_path))
|
||||
with pytest.raises(JsonRpcError, match="initialization failed"):
|
||||
rpc.start()
|
||||
|
||||
# A non-empty directory that is not a deltachat accounts directory.
|
||||
non_dc_dir = tmp_path / "invalid_dir"
|
||||
non_dc_dir.mkdir()
|
||||
(non_dc_dir / "some_file").write_text("content")
|
||||
rpc = Rpc(accounts_dir=str(non_dc_dir))
|
||||
with pytest.raises(JsonRpcError, match="initialization failed"):
|
||||
rpc.start()
|
||||
|
||||
|
||||
def test_provider_info(rpc) -> None:
|
||||
account_id = rpc.add_account()
|
||||
|
||||
|
||||
@@ -72,14 +72,7 @@ async fn main_impl() -> Result<()> {
|
||||
#[cfg(target_family = "unix")]
|
||||
let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?;
|
||||
|
||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||
log::info!("Starting with accounts directory `{path}`.");
|
||||
let writable = true;
|
||||
let accounts = Accounts::new(PathBuf::from(&path), writable).await?;
|
||||
|
||||
log::info!("Creating JSON-RPC API.");
|
||||
let accounts = Arc::new(RwLock::new(accounts));
|
||||
let state = CommandApi::from_arc(accounts.clone()).await;
|
||||
let (accounts, state) = init_accounts_and_report_status().await?;
|
||||
|
||||
let (client, mut out_receiver) = RpcClient::new();
|
||||
let session = RpcSession::new(client.clone(), state.clone());
|
||||
@@ -160,3 +153,41 @@ async fn main_impl() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn init_accounts_and_report_status() -> Result<(Arc<RwLock<Accounts>>, CommandApi)> {
|
||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||
log::info!("Starting with accounts directory `{path}`.");
|
||||
let path = PathBuf::from(&path);
|
||||
match Accounts::new(path.clone(), true).await {
|
||||
Ok(accounts) => {
|
||||
log::info!("Creating JSON-RPC API.");
|
||||
let accounts = Arc::new(RwLock::new(accounts));
|
||||
let state = CommandApi::from_arc(accounts.clone()).await;
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "ready",
|
||||
"params": [{
|
||||
"core_version": DC_VERSION_STR,
|
||||
"server_path": env::current_exe()?.display().to_string(),
|
||||
"accounts_dir": path.display().to_string(),
|
||||
}]
|
||||
}))?
|
||||
);
|
||||
Ok((accounts, state))
|
||||
}
|
||||
Err(err) => {
|
||||
let error_msg = format!("{err:#}");
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "init_error",
|
||||
"params": [error_msg]
|
||||
}))?
|
||||
);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user