mirror of
https://github.com/chatmail/core.git
synced 2026-05-01 20:36:31 +03:00
add console-subscriber to rpc-stdio server
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1515,6 +1515,7 @@ name = "deltachat-rpc-server"
|
|||||||
version = "1.148.1"
|
version = "1.148.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"console-subscriber",
|
||||||
"deltachat",
|
"deltachat",
|
||||||
"deltachat-jsonrpc",
|
"deltachat-jsonrpc",
|
||||||
"futures-lite 2.3.0",
|
"futures-lite 2.3.0",
|
||||||
@@ -1523,6 +1524,7 @@ dependencies = [
|
|||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"yerpc",
|
"yerpc",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ tokio = { workspace = true, features = ["io-std"] }
|
|||||||
tokio-util = { workspace = true }
|
tokio-util = { workspace = true }
|
||||||
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
tracing-subscriber = { workspace = true, features = ["env-filter"] }
|
||||||
yerpc = { workspace = true, features = ["anyhow_expose", "openrpc"] }
|
yerpc = { workspace = true, features = ["anyhow_expose", "openrpc"] }
|
||||||
|
console-subscriber = "0.4.0"
|
||||||
|
tracing = "0.1.40"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["vendored"]
|
default = ["vendored"]
|
||||||
|
|||||||
@@ -35,3 +35,14 @@ languages other than Rust, for example:
|
|||||||
|
|
||||||
Run `deltachat-rpc-server --version` to check the version of the server.
|
Run `deltachat-rpc-server --version` to check the version of the server.
|
||||||
Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API.
|
Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API.
|
||||||
|
|
||||||
|
|
||||||
|
## Usage with `tokio-console`
|
||||||
|
|
||||||
|
When build with `RUSTFLAGS="--cfg tokio_unstable"` console-subscriber is enabled.
|
||||||
|
That means that you can use [`tokio-console`](https://github.com/tokio-rs/console) to inspect active tokio tasks.
|
||||||
|
You can install it via `cargo install tokio-console`.
|
||||||
|
|
||||||
|
```
|
||||||
|
RUSTFLAGS="--cfg tokio_unstable" cargo run
|
||||||
|
```
|
||||||
@@ -7,11 +7,11 @@ use std::path::PathBuf;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::{anyhow, Context as _, Result};
|
use anyhow::{anyhow, Context as _, Result};
|
||||||
use deltachat::constants::DC_VERSION_STR;
|
use deltachat::{constants::DC_VERSION_STR, spawn_named_task};
|
||||||
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
||||||
use futures_lite::stream::StreamExt;
|
use futures_lite::stream::StreamExt;
|
||||||
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::{prelude::*, EnvFilter};
|
||||||
use yerpc::RpcServer as _;
|
use yerpc::RpcServer as _;
|
||||||
|
|
||||||
#[cfg(target_family = "unix")]
|
#[cfg(target_family = "unix")]
|
||||||
@@ -67,10 +67,21 @@ async fn main_impl() -> Result<()> {
|
|||||||
// Logs from `log` crate and traces from `tracing` crate
|
// Logs from `log` crate and traces from `tracing` crate
|
||||||
// are configurable with `RUST_LOG` environment variable
|
// are configurable with `RUST_LOG` environment variable
|
||||||
// and go to stderr to avoid interferring with JSON-RPC using stdout.
|
// and go to stderr to avoid interferring with JSON-RPC using stdout.
|
||||||
tracing_subscriber::fmt()
|
tracing::subscriber::set_global_default({
|
||||||
.with_env_filter(EnvFilter::from_default_env())
|
let subscribers = tracing_subscriber::Registry::default().with(
|
||||||
.with_writer(std::io::stderr)
|
tracing_subscriber::fmt::layer()
|
||||||
.init();
|
.with_writer(std::io::stderr)
|
||||||
|
.with_filter(EnvFilter::from_default_env()),
|
||||||
|
);
|
||||||
|
#[cfg(tokio_unstable)]
|
||||||
|
{
|
||||||
|
subscribers.with(console_subscriber::spawn())
|
||||||
|
}
|
||||||
|
#[cfg(not(tokio_unstable))]
|
||||||
|
{
|
||||||
|
subscribers
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||||
log::info!("Starting with accounts directory `{}`.", path);
|
log::info!("Starting with accounts directory `{}`.", path);
|
||||||
@@ -87,7 +98,7 @@ async fn main_impl() -> Result<()> {
|
|||||||
|
|
||||||
// Send task prints JSON responses to stdout.
|
// Send task prints JSON responses to stdout.
|
||||||
let cancel = main_cancel.clone();
|
let cancel = main_cancel.clone();
|
||||||
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
let send_task: JoinHandle<anyhow::Result<()>> = spawn_named_task!("send_task", async move {
|
||||||
let _cancel_guard = cancel.clone().drop_guard();
|
let _cancel_guard = cancel.clone().drop_guard();
|
||||||
loop {
|
loop {
|
||||||
let message = tokio::select! {
|
let message = tokio::select! {
|
||||||
@@ -104,24 +115,25 @@ async fn main_impl() -> Result<()> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let cancel = main_cancel.clone();
|
let cancel = main_cancel.clone();
|
||||||
let sigterm_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
let sigterm_task: JoinHandle<anyhow::Result<()>> =
|
||||||
#[cfg(target_family = "unix")]
|
spawn_named_task!("sigterm_task", async move {
|
||||||
{
|
#[cfg(target_family = "unix")]
|
||||||
let _cancel_guard = cancel.clone().drop_guard();
|
{
|
||||||
tokio::select! {
|
let _cancel_guard = cancel.clone().drop_guard();
|
||||||
_ = cancel.cancelled() => (),
|
tokio::select! {
|
||||||
_ = sigterm.recv() => {
|
_ = cancel.cancelled() => (),
|
||||||
log::info!("got SIGTERM");
|
_ = sigterm.recv() => {
|
||||||
|
log::info!("got SIGTERM");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
let _ = cancel;
|
||||||
let _ = cancel;
|
Ok(())
|
||||||
Ok(())
|
});
|
||||||
});
|
|
||||||
|
|
||||||
// Receiver task reads JSON requests from stdin.
|
// Receiver task reads JSON requests from stdin.
|
||||||
let cancel = main_cancel.clone();
|
let cancel = main_cancel.clone();
|
||||||
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
let recv_task: JoinHandle<anyhow::Result<()>> = spawn_named_task!("recv_task", async move {
|
||||||
let _cancel_guard = cancel.clone().drop_guard();
|
let _cancel_guard = cancel.clone().drop_guard();
|
||||||
let stdin = io::stdin();
|
let stdin = io::stdin();
|
||||||
let mut lines = BufReader::new(stdin).lines();
|
let mut lines = BufReader::new(stdin).lines();
|
||||||
@@ -143,7 +155,7 @@ async fn main_impl() -> Result<()> {
|
|||||||
};
|
};
|
||||||
log::trace!("RPC recv {}", message);
|
log::trace!("RPC recv {}", message);
|
||||||
let session = session.clone();
|
let session = session.clone();
|
||||||
tokio::spawn(async move {
|
spawn_named_task!("handle_incoming", async move {
|
||||||
session.handle_incoming(&message).await;
|
session.handle_incoming(&message).await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user