From de7d6753a93d4152fdbe27fa34b92fe06d52d253 Mon Sep 17 00:00:00 2001 From: Simon Laux Date: Thu, 3 Oct 2024 10:34:43 +0200 Subject: [PATCH] add console-subscriber to rpc-stdio server --- Cargo.lock | 2 ++ deltachat-rpc-server/Cargo.toml | 2 ++ deltachat-rpc-server/README.md | 11 +++++++ deltachat-rpc-server/src/main.rs | 54 +++++++++++++++++++------------- 4 files changed, 48 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index def401d62..029c6fe2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1515,6 +1515,7 @@ name = "deltachat-rpc-server" version = "1.148.1" dependencies = [ "anyhow", + "console-subscriber", "deltachat", "deltachat-jsonrpc", "futures-lite 2.3.0", @@ -1523,6 +1524,7 @@ dependencies = [ "serde_json", "tokio", "tokio-util", + "tracing", "tracing-subscriber", "yerpc", ] diff --git a/deltachat-rpc-server/Cargo.toml b/deltachat-rpc-server/Cargo.toml index 62c0c78ed..48c17eaf1 100644 --- a/deltachat-rpc-server/Cargo.toml +++ b/deltachat-rpc-server/Cargo.toml @@ -22,6 +22,8 @@ tokio = { workspace = true, features = ["io-std"] } tokio-util = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } yerpc = { workspace = true, features = ["anyhow_expose", "openrpc"] } +console-subscriber = "0.4.0" +tracing = "0.1.40" [features] default = ["vendored"] diff --git a/deltachat-rpc-server/README.md b/deltachat-rpc-server/README.md index 32ea83ac7..59ce420fd 100644 --- a/deltachat-rpc-server/README.md +++ b/deltachat-rpc-server/README.md @@ -35,3 +35,14 @@ languages other than Rust, for example: Run `deltachat-rpc-server --version` to check the version of the server. Run `deltachat-rpc-server --openrpc` to get [OpenRPC](https://open-rpc.org/) specification of the provided JSON-RPC API. + + +## Usage with `tokio-console` + +When build with `RUSTFLAGS="--cfg tokio_unstable"` console-subscriber is enabled. +That means that you can use [`tokio-console`](https://github.com/tokio-rs/console) to inspect active tokio tasks. +You can install it via `cargo install tokio-console`. + +``` +RUSTFLAGS="--cfg tokio_unstable" cargo run +``` \ No newline at end of file diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 69c37646d..fc0196b68 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -7,11 +7,11 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::{anyhow, Context as _, Result}; -use deltachat::constants::DC_VERSION_STR; +use deltachat::{constants::DC_VERSION_STR, spawn_named_task}; use deltachat_jsonrpc::api::{Accounts, CommandApi}; use futures_lite::stream::StreamExt; use tokio::io::{self, AsyncBufReadExt, BufReader}; -use tracing_subscriber::EnvFilter; +use tracing_subscriber::{prelude::*, EnvFilter}; use yerpc::RpcServer as _; #[cfg(target_family = "unix")] @@ -67,10 +67,21 @@ async fn main_impl() -> Result<()> { // Logs from `log` crate and traces from `tracing` crate // are configurable with `RUST_LOG` environment variable // and go to stderr to avoid interferring with JSON-RPC using stdout. - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .with_writer(std::io::stderr) - .init(); + tracing::subscriber::set_global_default({ + let subscribers = tracing_subscriber::Registry::default().with( + tracing_subscriber::fmt::layer() + .with_writer(std::io::stderr) + .with_filter(EnvFilter::from_default_env()), + ); + #[cfg(tokio_unstable)] + { + subscribers.with(console_subscriber::spawn()) + } + #[cfg(not(tokio_unstable))] + { + subscribers + } + })?; let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); log::info!("Starting with accounts directory `{}`.", path); @@ -87,7 +98,7 @@ async fn main_impl() -> Result<()> { // Send task prints JSON responses to stdout. let cancel = main_cancel.clone(); - let send_task: JoinHandle> = tokio::spawn(async move { + let send_task: JoinHandle> = spawn_named_task!("send_task", async move { let _cancel_guard = cancel.clone().drop_guard(); loop { let message = tokio::select! { @@ -104,24 +115,25 @@ async fn main_impl() -> Result<()> { }); let cancel = main_cancel.clone(); - let sigterm_task: JoinHandle> = tokio::spawn(async move { - #[cfg(target_family = "unix")] - { - let _cancel_guard = cancel.clone().drop_guard(); - tokio::select! { - _ = cancel.cancelled() => (), - _ = sigterm.recv() => { - log::info!("got SIGTERM"); + let sigterm_task: JoinHandle> = + spawn_named_task!("sigterm_task", async move { + #[cfg(target_family = "unix")] + { + let _cancel_guard = cancel.clone().drop_guard(); + tokio::select! { + _ = cancel.cancelled() => (), + _ = sigterm.recv() => { + log::info!("got SIGTERM"); + } } } - } - let _ = cancel; - Ok(()) - }); + let _ = cancel; + Ok(()) + }); // Receiver task reads JSON requests from stdin. let cancel = main_cancel.clone(); - let recv_task: JoinHandle> = tokio::spawn(async move { + let recv_task: JoinHandle> = spawn_named_task!("recv_task", async move { let _cancel_guard = cancel.clone().drop_guard(); let stdin = io::stdin(); let mut lines = BufReader::new(stdin).lines(); @@ -143,7 +155,7 @@ async fn main_impl() -> Result<()> { }; log::trace!("RPC recv {}", message); let session = session.clone(); - tokio::spawn(async move { + spawn_named_task!("handle_incoming", async move { session.handle_incoming(&message).await; }); }