mirror of
https://github.com/chatmail/core.git
synced 2026-04-20 23:16:30 +03:00
This way is more compatible to JSON-RPC libraries that do not support receiving notifications from the server and allows describing event types in the OpenRPC specification. Event thread converting events to notifications in the FFI is removed, so it is now possible to construct a dc_jsonrpc_instance_t while still retrieving events via dc_event_emitter_t.
144 lines
4.9 KiB
Rust
144 lines
4.9 KiB
Rust
use std::env;
|
|
///! Delta Chat core RPC server.
|
|
///!
|
|
///! It speaks JSON Lines over stdio.
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
|
|
use anyhow::{anyhow, Context as _, Result};
|
|
use deltachat::constants::DC_VERSION_STR;
|
|
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
|
use futures_lite::stream::StreamExt;
|
|
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
|
|
|
#[cfg(target_family = "unix")]
|
|
use tokio::signal::unix as signal_unix;
|
|
|
|
use tokio::sync::RwLock;
|
|
use tokio::task::JoinHandle;
|
|
use tokio_util::sync::CancellationToken;
|
|
use yerpc::{RpcClient, RpcSession};
|
|
|
|
#[tokio::main(flavor = "multi_thread")]
|
|
async fn main() {
|
|
let r = main_impl().await;
|
|
// From tokio documentation:
|
|
// "For technical reasons, stdin is implemented by using an ordinary blocking read on a separate
|
|
// thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang
|
|
// until the user presses enter."
|
|
std::process::exit(if r.is_ok() { 0 } else { 1 });
|
|
}
|
|
|
|
async fn main_impl() -> Result<()> {
|
|
let mut args = env::args_os();
|
|
let _program_name = args.next().context("no command line arguments found")?;
|
|
if let Some(first_arg) = args.next() {
|
|
if first_arg.to_str() == Some("--version") {
|
|
if let Some(arg) = args.next() {
|
|
return Err(anyhow!("Unrecognized argument {:?}", arg));
|
|
}
|
|
eprintln!("{}", &*DC_VERSION_STR);
|
|
return Ok(());
|
|
} else {
|
|
return Err(anyhow!("Unrecognized option {:?}", first_arg));
|
|
}
|
|
}
|
|
if let Some(arg) = args.next() {
|
|
return Err(anyhow!("Unrecognized argument {:?}", arg));
|
|
}
|
|
|
|
// Install signal handlers early so that the shutdown is graceful starting from here.
|
|
let _ctrl_c = tokio::signal::ctrl_c();
|
|
#[cfg(target_family = "unix")]
|
|
let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?;
|
|
|
|
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
|
|
|
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
|
log::info!("Starting with accounts directory `{}`.", path);
|
|
let accounts = Accounts::new(PathBuf::from(&path)).await?;
|
|
|
|
log::info!("Creating JSON-RPC API.");
|
|
let accounts = Arc::new(RwLock::new(accounts));
|
|
let state = CommandApi::from_arc(accounts.clone());
|
|
|
|
let (client, mut out_receiver) = RpcClient::new();
|
|
let session = RpcSession::new(client.clone(), state.clone());
|
|
let main_cancel = CancellationToken::new();
|
|
|
|
// Send task prints JSON responses to stdout.
|
|
let cancel = main_cancel.clone();
|
|
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
|
let _cancel_guard = cancel.clone().drop_guard();
|
|
loop {
|
|
let message = tokio::select! {
|
|
_ = cancel.cancelled() => break,
|
|
message = out_receiver.next() => match message {
|
|
None => break,
|
|
Some(message) => serde_json::to_string(&message)?,
|
|
}
|
|
};
|
|
log::trace!("RPC send {}", message);
|
|
println!("{message}");
|
|
}
|
|
Ok(())
|
|
});
|
|
|
|
let cancel = main_cancel.clone();
|
|
let sigterm_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
|
#[cfg(target_family = "unix")]
|
|
{
|
|
let _cancel_guard = cancel.clone().drop_guard();
|
|
tokio::select! {
|
|
_ = cancel.cancelled() => (),
|
|
_ = sigterm.recv() => {
|
|
log::info!("got SIGTERM");
|
|
}
|
|
}
|
|
}
|
|
let _ = cancel;
|
|
Ok(())
|
|
});
|
|
|
|
// Receiver task reads JSON requests from stdin.
|
|
let cancel = main_cancel.clone();
|
|
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
|
let _cancel_guard = cancel.clone().drop_guard();
|
|
let stdin = io::stdin();
|
|
let mut lines = BufReader::new(stdin).lines();
|
|
|
|
loop {
|
|
let message = tokio::select! {
|
|
_ = cancel.cancelled() => break,
|
|
_ = tokio::signal::ctrl_c() => {
|
|
log::info!("got ctrl-c event");
|
|
break;
|
|
}
|
|
message = lines.next_line() => match message? {
|
|
None => {
|
|
log::info!("EOF reached on stdin");
|
|
break;
|
|
}
|
|
Some(message) => message,
|
|
}
|
|
};
|
|
log::trace!("RPC recv {}", message);
|
|
let session = session.clone();
|
|
tokio::spawn(async move {
|
|
session.handle_incoming(&message).await;
|
|
});
|
|
}
|
|
Ok(())
|
|
});
|
|
|
|
main_cancel.cancelled().await;
|
|
accounts.read().await.stop_io().await;
|
|
drop(accounts);
|
|
drop(state);
|
|
send_task.await??;
|
|
sigterm_task.await??;
|
|
recv_task.await??;
|
|
|
|
Ok(())
|
|
}
|