Cleanly terminate deltachat-rpc-server (#4234)

Do it as per "Thread safety" section in deltachat-ffi/deltachat.h. Also terminate on ctrl-c.
This commit is contained in:
iequidoo
2023-04-04 11:33:48 -04:00
committed by iequidoo
parent cecc080931
commit f1eeb1df8c
4 changed files with 51 additions and 13 deletions

View File

@@ -9,7 +9,8 @@
- Compress `mime_headers` column with HTML emails stored in database - Compress `mime_headers` column with HTML emails stored in database
- Strip BIDI characters in system messages, files, group names and contact names #3479 - Strip BIDI characters in system messages, files, group names and contact names #3479
- maybe_add_time_based_warnings(): Use release date instead of the provider DB update one - maybe_add_time_based_warnings(): Use release date instead of the provider DB update one
- Cleanly terminate deltachat-rpc-server.
Also terminate on ctrl-c.
### Fixes ### Fixes
- Fix python bindings README documentation on installing the bindings from source. - Fix python bindings README documentation on installing the bindings from source.

1
Cargo.lock generated
View File

@@ -1280,6 +1280,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-util",
"yerpc", "yerpc",
] ]

View File

@@ -20,6 +20,7 @@ log = "0.4"
serde_json = "1.0.95" serde_json = "1.0.95"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.27.0", features = ["io-std"] } tokio = { version = "1.27.0", features = ["io-std"] }
tokio-util = "0.7.7"
yerpc = { version = "0.4.0", features = ["anyhow_expose"] } yerpc = { version = "0.4.0", features = ["anyhow_expose"] }
[features] [features]

View File

@@ -3,6 +3,7 @@ use std::env;
///! ///!
///! It speaks JSON Lines over stdio. ///! It speaks JSON Lines over stdio.
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Context as _, Result}; use anyhow::{anyhow, Context as _, Result};
use deltachat::constants::DC_VERSION_STR; use deltachat::constants::DC_VERSION_STR;
@@ -10,7 +11,9 @@ use deltachat_jsonrpc::api::events::event_to_json_rpc_notification;
use deltachat_jsonrpc::api::{Accounts, CommandApi}; use deltachat_jsonrpc::api::{Accounts, CommandApi};
use futures_lite::stream::StreamExt; use futures_lite::stream::StreamExt;
use tokio::io::{self, AsyncBufReadExt, BufReader}; use tokio::io::{self, AsyncBufReadExt, BufReader};
use tokio::sync::RwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use yerpc::{RpcClient, RpcSession}; use yerpc::{RpcClient, RpcSession};
#[tokio::main(flavor = "multi_thread")] #[tokio::main(flavor = "multi_thread")]
@@ -40,24 +43,38 @@ async fn main() -> Result<()> {
let events = accounts.get_event_emitter(); let events = accounts.get_event_emitter();
log::info!("Creating JSON-RPC API."); log::info!("Creating JSON-RPC API.");
let state = CommandApi::new(accounts); let accounts = Arc::new(RwLock::new(accounts));
let state = CommandApi::from_arc(accounts.clone());
let (client, mut out_receiver) = RpcClient::new(); let (client, mut out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), state); let session = RpcSession::new(client.clone(), state.clone());
let canceler = CancellationToken::new();
// Events task converts core events to JSON-RPC notifications. // Events task converts core events to JSON-RPC notifications.
let events_task: JoinHandle<Result<()>> = tokio::spawn(async move { let events_task: JoinHandle<Result<()>> = tokio::spawn(async move {
let mut r = Ok(());
while let Some(event) = events.recv().await { while let Some(event) = events.recv().await {
let event = event_to_json_rpc_notification(event); if r.is_err() {
client.send_notification("event", Some(event)).await?; continue;
} }
let event = event_to_json_rpc_notification(event);
r = client.send_notification("event", Some(event)).await;
}
r?;
Ok(()) Ok(())
}); });
// Send task prints JSON responses to stdout. // Send task prints JSON responses to stdout.
let cancelable = canceler.clone();
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move { let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
while let Some(message) = out_receiver.next().await { loop {
let message = serde_json::to_string(&message)?; let message = tokio::select! {
_ = cancelable.cancelled() => break,
message = out_receiver.next() => match message {
None => break,
Some(message) => serde_json::to_string(&message)?,
}
};
log::trace!("RPC send {}", message); log::trace!("RPC send {}", message);
println!("{message}"); println!("{message}");
} }
@@ -68,23 +85,41 @@ async fn main() -> Result<()> {
let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move { let recv_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
let stdin = io::stdin(); let stdin = io::stdin();
let mut lines = BufReader::new(stdin).lines(); let mut lines = BufReader::new(stdin).lines();
while let Some(message) = lines.next_line().await? { loop {
let message = tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("got ctrl-c event");
break;
}
message = lines.next_line() => match message? {
None => {
log::info!("EOF reached on stdin");
break;
}
Some(message) => message,
}
};
log::trace!("RPC recv {}", message); log::trace!("RPC recv {}", message);
let session = session.clone(); let session = session.clone();
tokio::spawn(async move { tokio::spawn(async move {
session.handle_incoming(&message).await; session.handle_incoming(&message).await;
}); });
} }
log::info!("EOF reached on stdin");
Ok(()) Ok(())
}); });
// Wait for the end of stdin. // Wait for the end of stdin / ctrl-c.
recv_task.await??; recv_task.await?.ok();
// Shutdown the server. // See "Thread safety" section in deltachat-ffi/deltachat.h for explanation.
send_task.abort(); // NB: Events are drained by events_task.
events_task.abort(); canceler.cancel();
accounts.read().await.stop_io().await;
drop(state);
let (r0, r1) = tokio::join!(events_task, send_task);
for r in [r0, r1] {
r??;
}
Ok(()) Ok(())
} }