diff --git a/deltachat-rpc-server/README.md b/deltachat-rpc-server/README.md index a5108c4c8..f6b0afa74 100644 --- a/deltachat-rpc-server/README.md +++ b/deltachat-rpc-server/README.md @@ -1,7 +1,7 @@ # Delta Chat RPC server This program provides a [JSON-RPC 2.0](https://www.jsonrpc.org/specification) interface to DeltaChat -over standard I/O. +over standard I/O or UNIX sockets. ## Install @@ -35,3 +35,25 @@ languages other than Rust, for example: Run `deltachat-rpc-server --version` to check the version of the server. Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API. + +### Usage over unix sockets + +> At this time this does not work on windows because rust does not support unix sockets on windows, yet (). + +Standard I/O is the default option, but you can also tell `deltachat-rpc-server` to use a unix socket instead. + +``` +deltachat-rpc-server --unix ./chatmail-core.sock +``` + +While this technically allows multiple processes to communicate with the same rpc server instance, +please note that there is still only event queue, so only one of these processed should read the events at a time. + +You can test it with socat: +```sh +socat - UNIX-CONNECT:./chatmail-core.sock +``` +Then paste the following jsonrpc command and press enter: +```json +{"method": "get_system_info","id": 1,"params": []} +``` diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 25d9f45d4..b8d0a5f8c 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -2,9 +2,9 @@ //! Delta Chat core RPC server. //! //! It speaks JSON Lines over stdio. -use std::env; use std::path::PathBuf; use std::sync::Arc; +use std::{env, ffi::OsStr}; use anyhow::{anyhow, Context as _, Result}; use deltachat::constants::DC_VERSION_STR; @@ -14,6 +14,8 @@ use tokio::io::{self, AsyncBufReadExt, BufReader}; use tracing_subscriber::EnvFilter; use yerpc::RpcServer as _; +#[cfg(target_family = "unix")] +use tokio::net::UnixListener; #[cfg(target_family = "unix")] use tokio::signal::unix as signal_unix; @@ -46,6 +48,7 @@ async fn main() { async fn main_impl() -> Result<()> { let mut args = env::args_os(); let _program_name = args.next().context("no command line arguments found")?; + let mut unix_socket = None; if let Some(first_arg) = args.next() { if first_arg.to_str() == Some("--version") { if let Some(arg) = args.next() { @@ -59,6 +62,11 @@ async fn main_impl() -> Result<()> { } println!("{}", CommandApi::openrpc_specification()?); return Ok(()); + } else if first_arg.to_str() == Some("--unix") { + let Some(unix_socket_path) = args.next() else { + return Err(anyhow!("Unix Socket Path is missing")); + }; + unix_socket = Some(unix_socket_path) } else { return Err(anyhow!("Unrecognized option {first_arg:?}")); } @@ -99,7 +107,16 @@ async fn main_impl() -> Result<()> { Ok(()) }); - let (send_task, recv_task) = stdio_impl(state, main_cancel.clone()).await?; + let (send_task, recv_task) = if let Some(unix_socket_path) = unix_socket { + #[cfg(not(target_family = "unix"))] + { + bail!("unix sockets are only supported on unix based operating systems"); + } + #[cfg(target_family = "unix")] + unix_socket_impl(&unix_socket_path, state, main_cancel.clone()).await? + } else { + stdio_impl(state, main_cancel.clone()).await? + }; main_cancel.cancelled().await; accounts.read().await.stop_io().await; @@ -173,3 +190,102 @@ async fn stdio_impl( Ok((send_task, recv_task)) } + +#[cfg(target_family = "unix")] +async fn unix_socket_impl( + unix_socket_path: &OsStr, + state: CommandApi, + main_cancel: CancellationToken, +) -> Result<( + JoinHandle>, + JoinHandle>, +)> { + let cancel = main_cancel.clone(); + + let listener = UnixListener::bind(unix_socket_path)?; + + let recv_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); + let cancel = main_cancel.clone(); + + loop { + let connection_result = tokio::select! { + _ = cancel.cancelled() => break, + _ = tokio::signal::ctrl_c() => { + log::info!("got ctrl-c event"); + break; + } + connection_result = listener.accept() => connection_result + }; + match connection_result { + Ok((stream, addr)) => { + log::info!("new client {addr:?}"); + + let (client, mut out_receiver) = RpcClient::new(); + let session = RpcSession::new(client.clone(), state.clone()); + + let (read, mut write) = stream.into_split(); + let mut read_lines = BufReader::new(read).lines(); + + let cancel = main_cancel.clone(); + let _receive_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); + loop { + let message = tokio::select! { + _ = cancel.cancelled() => break, + _ = tokio::signal::ctrl_c() => { + log::info!("got ctrl-c event"); + break; + } + message = + read_lines.next_line() + => match message? { + None => { + log::info!("EOF reached on stdin"); + break; + } + Some(message) => message, + } + }; + log::trace!("RPC recv {message}"); + let session = session.clone(); + tokio::spawn(async move { + session.handle_incoming(&message).await; + }); + } + Ok(()) + }); + + let cancel = main_cancel.clone(); + let _send_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); + loop { + use tokio::io::AsyncWriteExt; + + let message = tokio::select! { + _ = cancel.cancelled() => break, + message = out_receiver.next() => match message { + None => break, + Some(message) => serde_json::to_string(&message)?, + } + }; + log::trace!("RPC send {message}"); + write.write_all(format!("{message}\n").as_bytes()).await?; + } + Ok(()) + }); + + // todo handle shutdown of _send_task and _receive_task + } + Err(e) => { + log::info!("connection failed {e:#}"); + } + } + } + // todo shutdown all remaining unix streams via their shutdown method + + Ok(()) + }); + + Ok((tokio::spawn(async move { Ok(()) }), recv_task)) +}