feat: add unix socket support to deltachat-rpc-server

This commit is contained in:
Simon Laux
2026-01-11 06:03:43 +01:00
parent e4d2e5075d
commit fe5f59a744
2 changed files with 141 additions and 3 deletions

View File

@@ -2,9 +2,9 @@
//! Delta Chat core RPC server.
//!
//! It speaks JSON Lines over stdio.
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use std::{env, ffi::OsStr};
use anyhow::{anyhow, Context as _, Result};
use deltachat::constants::DC_VERSION_STR;
@@ -14,6 +14,8 @@ use tokio::io::{self, AsyncBufReadExt, BufReader};
use tracing_subscriber::EnvFilter;
use yerpc::RpcServer as _;
#[cfg(target_family = "unix")]
use tokio::net::UnixListener;
#[cfg(target_family = "unix")]
use tokio::signal::unix as signal_unix;
@@ -46,6 +48,7 @@ async fn main() {
async fn main_impl() -> Result<()> {
let mut args = env::args_os();
let _program_name = args.next().context("no command line arguments found")?;
let mut unix_socket = None;
if let Some(first_arg) = args.next() {
if first_arg.to_str() == Some("--version") {
if let Some(arg) = args.next() {
@@ -59,6 +62,11 @@ async fn main_impl() -> Result<()> {
}
println!("{}", CommandApi::openrpc_specification()?);
return Ok(());
} else if first_arg.to_str() == Some("--unix") {
let Some(unix_socket_path) = args.next() else {
return Err(anyhow!("Unix Socket Path is missing"));
};
unix_socket = Some(unix_socket_path)
} else {
return Err(anyhow!("Unrecognized option {first_arg:?}"));
}
@@ -99,7 +107,16 @@ async fn main_impl() -> Result<()> {
Ok(())
});
let (send_task, recv_task) = stdio_impl(state, main_cancel.clone()).await?;
let (send_task, recv_task) = if let Some(unix_socket_path) = unix_socket {
#[cfg(not(target_family = "unix"))]
{
bail!("unix sockets are only supported on unix based operating systems");
}
#[cfg(target_family = "unix")]
unix_socket_impl(&unix_socket_path, state, main_cancel.clone()).await?
} else {
stdio_impl(state, main_cancel.clone()).await?
};
main_cancel.cancelled().await;
accounts.read().await.stop_io().await;
@@ -173,3 +190,102 @@ async fn stdio_impl(
Ok((send_task, recv_task))
}
#[cfg(target_family = "unix")]
async fn unix_socket_impl(
unix_socket_path: &OsStr,
state: CommandApi,
main_cancel: CancellationToken,
) -> Result<(
JoinHandle<anyhow::Result<()>>,
JoinHandle<anyhow::Result<()>>,
)> {
let cancel = main_cancel.clone();
let listener = UnixListener::bind(unix_socket_path)?;
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let _cancel_guard = cancel.clone().drop_guard();
let cancel = main_cancel.clone();
loop {
let connection_result = tokio::select! {
_ = cancel.cancelled() => break,
_ = tokio::signal::ctrl_c() => {
log::info!("got ctrl-c event");
break;
}
connection_result = listener.accept() => connection_result
};
match connection_result {
Ok((stream, addr)) => {
log::info!("new client {addr:?}");
let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state.clone());
let (read, mut write) = stream.into_split();
let mut read_lines = BufReader::new(read).lines();
let cancel = main_cancel.clone();
let _receive_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let _cancel_guard = cancel.clone().drop_guard();
loop {
let message = tokio::select! {
_ = cancel.cancelled() => break,
_ = tokio::signal::ctrl_c() => {
log::info!("got ctrl-c event");
break;
}
message =
read_lines.next_line()
=> match message? {
None => {
log::info!("EOF reached on stdin");
break;
}
Some(message) => message,
}
};
log::trace!("RPC recv {message}");
let session = session.clone();
tokio::spawn(async move {
session.handle_incoming(&message).await;
});
}
Ok(())
});
let cancel = main_cancel.clone();
let _send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let _cancel_guard = cancel.clone().drop_guard();
loop {
use tokio::io::AsyncWriteExt;
let message = tokio::select! {
_ = cancel.cancelled() => break,
message = out_receiver.next() => match message {
None => break,
Some(message) => serde_json::to_string(&message)?,
}
};
log::trace!("RPC send {message}");
write.write_all(format!("{message}\n").as_bytes()).await?;
}
Ok(())
});
// todo handle shutdown of _send_task and _receive_task
}
Err(e) => {
log::info!("connection failed {e:#}");
}
}
}
// todo shutdown all remaining unix streams via their shutdown method
Ok(())
});
Ok((tokio::spawn(async move { Ok(()) }), recv_task))
}