From 4aae48b0a1d9d1104ce5958d8a6508a81c6ad9a0 Mon Sep 17 00:00:00 2001 From: iequidoo Date: Thu, 13 Apr 2023 22:17:22 -0400 Subject: [PATCH] Gracefully terminate deltachat-rpc-server on ctrl-c and SIGTERM (#4323) --- CHANGELOG.md | 3 +- deltachat-rpc-server/src/main.rs | 59 ++++++++++++++++++++++++++------ 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57281bbaf..d0074a528 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ - maybe_add_time_based_warnings(): Use release date instead of the provider DB update one - Remove confusing log line "ignoring unsolicited response Recent(…)" #3934 - Cleanly terminate deltachat-rpc-server. - Also terminate on ctrl-c. + I.e. on EOF in stdin. - Refactorings #4317 - Add JSON-RPC API `can_send()`. - New `dc_get_next_msgs()` and `dc_wait_next_msgs()` C APIs. @@ -24,6 +24,7 @@ - New Python bindings APIs `Message.is_from_self()` and `Message.is_from_device()`. - Remove metadata from avatars and JPEG images before sending #4037 - Reduce + recode images to JPEG if they are > 500K in size #4037 +- Gracefully terminate deltachat-rpc-server on ctrl-c and SIGTERM. ### Fixes - Fix python bindings README documentation on installing the bindings from source. diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index a4e99fe2c..419f4deb4 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -11,13 +11,26 @@ use deltachat_jsonrpc::api::events::event_to_json_rpc_notification; use deltachat_jsonrpc::api::{Accounts, CommandApi}; use futures_lite::stream::StreamExt; use tokio::io::{self, AsyncBufReadExt, BufReader}; + +#[cfg(target_family = "unix")] +use tokio::signal::unix as signal_unix; + use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use yerpc::{RpcClient, RpcSession}; #[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()> { +async fn main() { + let r = main_impl().await; + // From tokio documentation: + // "For technical reasons, stdin is implemented by using an ordinary blocking read on a separate + // thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang + // until the user presses enter." + std::process::exit(if r.is_ok() { 0 } else { 1 }); +} + +async fn main_impl() -> Result<()> { let mut args = env::args_os(); let _program_name = args.next().context("no command line arguments found")?; if let Some(first_arg) = args.next() { @@ -35,6 +48,11 @@ async fn main() -> Result<()> { return Err(anyhow!("Unrecognized argument {:?}", arg)); } + // Install signal handlers early so that the shutdown is graceful starting from here. + let _ctrl_c = tokio::signal::ctrl_c(); + #[cfg(target_family = "unix")] + let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?; + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); @@ -48,10 +66,12 @@ async fn main() -> Result<()> { let (client, mut out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), state.clone()); - let canceler = CancellationToken::new(); + let main_cancel = CancellationToken::new(); // Events task converts core events to JSON-RPC notifications. + let cancel = main_cancel.clone(); let events_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); let mut r = Ok(()); while let Some(event) = events.recv().await { if r.is_err() { @@ -60,16 +80,16 @@ async fn main() -> Result<()> { let event = event_to_json_rpc_notification(event); r = client.send_notification("event", Some(event)).await; } - r?; Ok(()) }); // Send task prints JSON responses to stdout. - let cancelable = canceler.clone(); + let cancel = main_cancel.clone(); let send_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); loop { let message = tokio::select! { - _ = cancelable.cancelled() => break, + _ = cancel.cancelled() => break, message = out_receiver.next() => match message { None => break, Some(message) => serde_json::to_string(&message)?, @@ -81,12 +101,32 @@ async fn main() -> Result<()> { Ok(()) }); + let cancel = main_cancel.clone(); + let sigterm_task: JoinHandle> = tokio::spawn(async move { + #[cfg(target_family = "unix")] + { + let _cancel_guard = cancel.clone().drop_guard(); + tokio::select! { + _ = cancel.cancelled() => (), + _ = sigterm.recv() => { + log::info!("got SIGTERM"); + } + } + } + let _ = cancel; + Ok(()) + }); + // Receiver task reads JSON requests from stdin. + let cancel = main_cancel.clone(); let recv_task: JoinHandle> = tokio::spawn(async move { + let _cancel_guard = cancel.clone().drop_guard(); let stdin = io::stdin(); let mut lines = BufReader::new(stdin).lines(); + loop { let message = tokio::select! { + _ = cancel.cancelled() => break, _ = tokio::signal::ctrl_c() => { log::info!("got ctrl-c event"); break; @@ -108,17 +148,14 @@ async fn main() -> Result<()> { Ok(()) }); - // Wait for the end of stdin / ctrl-c. - recv_task.await?.ok(); - // See "Thread safety" section in deltachat-ffi/deltachat.h for explanation. // NB: Events are drained by events_task. - canceler.cancel(); + main_cancel.cancelled().await; accounts.read().await.stop_io().await; drop(accounts); drop(state); - let (r0, r1) = tokio::join!(events_task, send_task); - for r in [r0, r1] { + let (r0, r1, r2, r3) = tokio::join!(events_task, send_task, sigterm_task, recv_task); + for r in [r0, r1, r2, r3] { r??; }