diff --git a/CHANGELOG.md b/CHANGELOG.md index b5d78e1c3..07b14f765 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ - Compress `mime_headers` column with HTML emails stored in database - Strip BIDI characters in system messages, files, group names and contact names #3479 - maybe_add_time_based_warnings(): Use release date instead of the provider DB update one - +- Cleanly terminate deltachat-rpc-server. + Also terminate on ctrl-c. ### Fixes - Fix python bindings README documentation on installing the bindings from source. diff --git a/Cargo.lock b/Cargo.lock index 2bebe7620..a2748fc7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1280,6 +1280,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "yerpc", ] diff --git a/deltachat-rpc-server/Cargo.toml b/deltachat-rpc-server/Cargo.toml index d808da059..f66223ade 100644 --- a/deltachat-rpc-server/Cargo.toml +++ b/deltachat-rpc-server/Cargo.toml @@ -20,6 +20,7 @@ log = "0.4" serde_json = "1.0.95" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1.27.0", features = ["io-std"] } +tokio-util = "0.7.7" yerpc = { version = "0.4.0", features = ["anyhow_expose"] } [features] diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 1e957a38b..87df5fdf4 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -3,6 +3,7 @@ use std::env; ///! ///! It speaks JSON Lines over stdio. use std::path::PathBuf; +use std::sync::Arc; use anyhow::{anyhow, Context as _, Result}; use deltachat::constants::DC_VERSION_STR; @@ -10,7 +11,9 @@ use deltachat_jsonrpc::api::events::event_to_json_rpc_notification; use deltachat_jsonrpc::api::{Accounts, CommandApi}; use futures_lite::stream::StreamExt; use tokio::io::{self, AsyncBufReadExt, BufReader}; +use tokio::sync::RwLock; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use yerpc::{RpcClient, RpcSession}; #[tokio::main(flavor = "multi_thread")] @@ -40,24 +43,38 @@ async fn main() -> Result<()> { let events = accounts.get_event_emitter(); log::info!("Creating JSON-RPC API."); - let state = CommandApi::new(accounts); + let accounts = Arc::new(RwLock::new(accounts)); + let state = CommandApi::from_arc(accounts.clone()); let (client, mut out_receiver) = RpcClient::new(); - let session = RpcSession::new(client.clone(), state); + let session = RpcSession::new(client.clone(), state.clone()); + let canceler = CancellationToken::new(); // Events task converts core events to JSON-RPC notifications. let events_task: JoinHandle> = tokio::spawn(async move { + let mut r = Ok(()); while let Some(event) = events.recv().await { + if r.is_err() { + continue; + } let event = event_to_json_rpc_notification(event); - client.send_notification("event", Some(event)).await?; + r = client.send_notification("event", Some(event)).await; } + r?; Ok(()) }); // Send task prints JSON responses to stdout. + let cancelable = canceler.clone(); let send_task: JoinHandle> = tokio::spawn(async move { - while let Some(message) = out_receiver.next().await { - let message = serde_json::to_string(&message)?; + loop { + let message = tokio::select! { + _ = cancelable.cancelled() => break, + message = out_receiver.next() => match message { + None => break, + Some(message) => serde_json::to_string(&message)?, + } + }; log::trace!("RPC send {}", message); println!("{message}"); } @@ -68,23 +85,41 @@ async fn main() -> Result<()> { let recv_task: JoinHandle> = tokio::spawn(async move { let stdin = io::stdin(); let mut lines = BufReader::new(stdin).lines(); - while let Some(message) = lines.next_line().await? { + loop { + let message = tokio::select! { + _ = tokio::signal::ctrl_c() => { + log::info!("got ctrl-c event"); + break; + } + message = lines.next_line() => match message? { + None => { + log::info!("EOF reached on stdin"); + break; + } + Some(message) => message, + } + }; log::trace!("RPC recv {}", message); let session = session.clone(); tokio::spawn(async move { session.handle_incoming(&message).await; }); } - log::info!("EOF reached on stdin"); Ok(()) }); - // Wait for the end of stdin. - recv_task.await??; + // Wait for the end of stdin / ctrl-c. + recv_task.await?.ok(); - // Shutdown the server. - send_task.abort(); - events_task.abort(); + // See "Thread safety" section in deltachat-ffi/deltachat.h for explanation. + // NB: Events are drained by events_task. + canceler.cancel(); + accounts.read().await.stop_io().await; + drop(state); + let (r0, r1) = tokio::join!(events_task, send_task); + for r in [r0, r1] { + r??; + } Ok(()) }