Gracefully terminate deltachat-rpc-server on ctrl-c and SIGTERM (#4323)

This commit is contained in:
iequidoo
2023-04-13 22:17:22 -04:00
committed by iequidoo
parent a8b790a5db
commit 4aae48b0a1
2 changed files with 50 additions and 12 deletions

View File

@@ -11,7 +11,7 @@
- maybe_add_time_based_warnings(): Use release date instead of the provider DB update one
- Remove confusing log line "ignoring unsolicited response Recent(…)" #3934
- Cleanly terminate deltachat-rpc-server.
Also terminate on ctrl-c.
I.e. on EOF in stdin.
- Refactorings #4317
- Add JSON-RPC API `can_send()`.
- New `dc_get_next_msgs()` and `dc_wait_next_msgs()` C APIs.
@@ -24,6 +24,7 @@
- New Python bindings APIs `Message.is_from_self()` and `Message.is_from_device()`.
- Remove metadata from avatars and JPEG images before sending #4037
- Reduce + recode images to JPEG if they are > 500K in size #4037
- Gracefully terminate deltachat-rpc-server on ctrl-c and SIGTERM.
### Fixes
- Fix python bindings README documentation on installing the bindings from source.

View File

@@ -11,13 +11,26 @@ use deltachat_jsonrpc::api::events::event_to_json_rpc_notification;
use deltachat_jsonrpc::api::{Accounts, CommandApi};
use futures_lite::stream::StreamExt;
use tokio::io::{self, AsyncBufReadExt, BufReader};
#[cfg(target_family = "unix")]
use tokio::signal::unix as signal_unix;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use yerpc::{RpcClient, RpcSession};
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
async fn main() {
let r = main_impl().await;
// From tokio documentation:
// "For technical reasons, stdin is implemented by using an ordinary blocking read on a separate
// thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang
// until the user presses enter."
std::process::exit(if r.is_ok() { 0 } else { 1 });
}
async fn main_impl() -> Result<()> {
let mut args = env::args_os();
let _program_name = args.next().context("no command line arguments found")?;
if let Some(first_arg) = args.next() {
@@ -35,6 +48,11 @@ async fn main() -> Result<()> {
return Err(anyhow!("Unrecognized argument {:?}", arg));
}
// Install signal handlers early so that the shutdown is graceful starting from here.
let _ctrl_c = tokio::signal::ctrl_c();
#[cfg(target_family = "unix")]
let mut sigterm = signal_unix::signal(signal_unix::SignalKind::terminate())?;
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
@@ -48,10 +66,12 @@ async fn main() -> Result<()> {
let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state.clone());
let canceler = CancellationToken::new();
let main_cancel = CancellationToken::new();
// Events task converts core events to JSON-RPC notifications.
let cancel = main_cancel.clone();
let events_task: JoinHandle<Result<()>> = tokio::spawn(async move {
let _cancel_guard = cancel.clone().drop_guard();
let mut r = Ok(());
while let Some(event) = events.recv().await {
if r.is_err() {
@@ -60,16 +80,16 @@ async fn main() -> Result<()> {
let event = event_to_json_rpc_notification(event);
r = client.send_notification("event", Some(event)).await;
}
r?;
Ok(())
});
// Send task prints JSON responses to stdout.
let cancelable = canceler.clone();
let cancel = main_cancel.clone();
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let _cancel_guard = cancel.clone().drop_guard();
loop {
let message = tokio::select! {
_ = cancelable.cancelled() => break,
_ = cancel.cancelled() => break,
message = out_receiver.next() => match message {
None => break,
Some(message) => serde_json::to_string(&message)?,
@@ -81,12 +101,32 @@ async fn main() -> Result<()> {
Ok(())
});
let cancel = main_cancel.clone();
let sigterm_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
#[cfg(target_family = "unix")]
{
let _cancel_guard = cancel.clone().drop_guard();
tokio::select! {
_ = cancel.cancelled() => (),
_ = sigterm.recv() => {
log::info!("got SIGTERM");
}
}
}
let _ = cancel;
Ok(())
});
// Receiver task reads JSON requests from stdin.
let cancel = main_cancel.clone();
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let _cancel_guard = cancel.clone().drop_guard();
let stdin = io::stdin();
let mut lines = BufReader::new(stdin).lines();
loop {
let message = tokio::select! {
_ = cancel.cancelled() => break,
_ = tokio::signal::ctrl_c() => {
log::info!("got ctrl-c event");
break;
@@ -108,17 +148,14 @@ async fn main() -> Result<()> {
Ok(())
});
// Wait for the end of stdin / ctrl-c.
recv_task.await?.ok();
// See "Thread safety" section in deltachat-ffi/deltachat.h for explanation.
// NB: Events are drained by events_task.
canceler.cancel();
main_cancel.cancelled().await;
accounts.read().await.stop_io().await;
drop(accounts);
drop(state);
let (r0, r1) = tokio::join!(events_task, send_task);
for r in [r0, r1] {
let (r0, r1, r2, r3) = tokio::join!(events_task, send_task, sigterm_task, recv_task);
for r in [r0, r1, r2, r3] {
r??;
}