From 514074de8bcd22c3181317e19238131d328ef3a1 Mon Sep 17 00:00:00 2001 From: link2xt Date: Thu, 20 Apr 2023 10:03:13 +0000 Subject: [PATCH] JSON-RPC: retrieve events via long polling This way is more compatible to JSON-RPC libraries that do not support receiving notifications from the server and allows describing event types in the OpenRPC specification. Event thread converting events to notifications in the FFI is removed, so it is now possible to construct a dc_jsonrpc_instance_t while still retrieving events via dc_event_emitter_t. --- CHANGELOG.md | 6 + deltachat-ffi/src/lib.rs | 23 +--- deltachat-jsonrpc/src/api/events.rs | 117 ++++++++---------- deltachat-jsonrpc/src/api/mod.rs | 11 ++ deltachat-jsonrpc/src/webserver.rs | 8 -- deltachat-jsonrpc/typescript/src/client.ts | 36 +++--- deltachat-jsonrpc/typescript/src/lib.ts | 1 - .../src/deltachat_rpc_client/rpc.py | 34 +++-- deltachat-rpc-server/src/main.rs | 26 +--- 9 files changed, 120 insertions(+), 142 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dc145bb4a..d55022256 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## Unreleased +### Changes +- JSON-RPC: Use long polling instead of server-sent notifications to retrieve events. + This better corresponds to JSON-RPC 2.0 server-client distinction + and is expected to simplify writing new bindings + because dispatching events can be done on higher level. + ### Fixes - JSON-RPC: do not print to stdout on failure to find an account. diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index c567ac50d..ebbe0d119 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4967,7 +4967,6 @@ pub unsafe extern "C" fn dc_accounts_get_event_emitter( #[cfg(feature = "jsonrpc")] mod jsonrpc { use deltachat_jsonrpc::api::CommandApi; - use deltachat_jsonrpc::events::event_to_json_rpc_notification; use deltachat_jsonrpc::yerpc::{OutReceiver, RpcClient, RpcSession}; use super::*; @@ -4975,7 +4974,6 @@ mod jsonrpc { pub struct dc_jsonrpc_instance_t { receiver: OutReceiver, handle: RpcSession, - event_thread: JoinHandle>, } #[no_mangle] @@ -4988,28 +4986,12 @@ mod jsonrpc { } let account_manager = &*account_manager; - let events = block_on(account_manager.read()).get_event_emitter(); let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone()); let (request_handle, receiver) = RpcClient::new(); - let handle = RpcSession::new(request_handle.clone(), cmd_api); + let handle = RpcSession::new(request_handle, cmd_api); - let event_thread = spawn(async move { - while let Some(event) = events.recv().await { - let event = event_to_json_rpc_notification(event); - request_handle - .send_notification("event", Some(event)) - .await?; - } - let res: Result<(), anyhow::Error> = Ok(()); - res - }); - - let instance = dc_jsonrpc_instance_t { - receiver, - handle, - event_thread, - }; + let instance = dc_jsonrpc_instance_t { receiver, handle }; Box::into_raw(Box::new(instance)) } @@ -5020,7 +5002,6 @@ mod jsonrpc { eprintln!("ignoring careless call to dc_jsonrpc_unref()"); return; } - (*jsonrpc_instance).event_thread.abort(); drop(Box::from_raw(jsonrpc_instance)); } diff --git a/deltachat-jsonrpc/src/api/events.rs b/deltachat-jsonrpc/src/api/events.rs index ddbbcebca..9c36e5b53 100644 --- a/deltachat-jsonrpc/src/api/events.rs +++ b/deltachat-jsonrpc/src/api/events.rs @@ -1,19 +1,28 @@ -use deltachat::{Event, EventType}; +use deltachat::{Event as CoreEvent, EventType as CoreEventType}; use serde::Serialize; -use serde_json::{json, Value}; use typescript_type_def::TypeDef; -pub fn event_to_json_rpc_notification(event: Event) -> Value { - let id: JSONRPCEventType = event.typ.into(); - json!({ - "event": id, - "contextId": event.id, - }) +#[derive(Serialize, TypeDef)] +pub struct Event { + /// Event payload. + event: EventType, + + /// Account ID. + context_id: u32, +} + +impl From for Event { + fn from(event: CoreEvent) -> Self { + Event { + event: event.typ.into(), + context_id: event.id, + } + } } #[derive(Serialize, TypeDef)] -#[serde(tag = "type", rename = "Event")] -pub enum JSONRPCEventType { +#[serde(tag = "type")] +pub enum EventType { /// The library-user may write an informational string to the log. /// /// This event should *not* be reported to the end-user using a popup or something like @@ -286,27 +295,27 @@ pub enum JSONRPCEventType { }, } -impl From for JSONRPCEventType { - fn from(event: EventType) -> Self { - use JSONRPCEventType::*; +impl From for EventType { + fn from(event: CoreEventType) -> Self { + use EventType::*; match event { - EventType::Info(msg) => Info { msg }, - EventType::SmtpConnected(msg) => SmtpConnected { msg }, - EventType::ImapConnected(msg) => ImapConnected { msg }, - EventType::SmtpMessageSent(msg) => SmtpMessageSent { msg }, - EventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg }, - EventType::ImapMessageMoved(msg) => ImapMessageMoved { msg }, - EventType::ImapInboxIdle => ImapInboxIdle, - EventType::NewBlobFile(file) => NewBlobFile { file }, - EventType::DeletedBlobFile(file) => DeletedBlobFile { file }, - EventType::Warning(msg) => Warning { msg }, - EventType::Error(msg) => Error { msg }, - EventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg }, - EventType::MsgsChanged { chat_id, msg_id } => MsgsChanged { + CoreEventType::Info(msg) => Info { msg }, + CoreEventType::SmtpConnected(msg) => SmtpConnected { msg }, + CoreEventType::ImapConnected(msg) => ImapConnected { msg }, + CoreEventType::SmtpMessageSent(msg) => SmtpMessageSent { msg }, + CoreEventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg }, + CoreEventType::ImapMessageMoved(msg) => ImapMessageMoved { msg }, + CoreEventType::ImapInboxIdle => ImapInboxIdle, + CoreEventType::NewBlobFile(file) => NewBlobFile { file }, + CoreEventType::DeletedBlobFile(file) => DeletedBlobFile { file }, + CoreEventType::Warning(msg) => Warning { msg }, + CoreEventType::Error(msg) => Error { msg }, + CoreEventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg }, + CoreEventType::MsgsChanged { chat_id, msg_id } => MsgsChanged { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::ReactionsChanged { + CoreEventType::ReactionsChanged { chat_id, msg_id, contact_id, @@ -315,92 +324,76 @@ impl From for JSONRPCEventType { msg_id: msg_id.to_u32(), contact_id: contact_id.to_u32(), }, - EventType::IncomingMsg { chat_id, msg_id } => IncomingMsg { + CoreEventType::IncomingMsg { chat_id, msg_id } => IncomingMsg { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch { + CoreEventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch { msg_ids: msg_ids.into_iter().map(|id| id.to_u32()).collect(), }, - EventType::MsgsNoticed(chat_id) => MsgsNoticed { + CoreEventType::MsgsNoticed(chat_id) => MsgsNoticed { chat_id: chat_id.to_u32(), }, - EventType::MsgDelivered { chat_id, msg_id } => MsgDelivered { + CoreEventType::MsgDelivered { chat_id, msg_id } => MsgDelivered { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::MsgFailed { chat_id, msg_id } => MsgFailed { + CoreEventType::MsgFailed { chat_id, msg_id } => MsgFailed { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::MsgRead { chat_id, msg_id } => MsgRead { + CoreEventType::MsgRead { chat_id, msg_id } => MsgRead { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::ChatModified(chat_id) => ChatModified { + CoreEventType::ChatModified(chat_id) => ChatModified { chat_id: chat_id.to_u32(), }, - EventType::ChatEphemeralTimerModified { chat_id, timer } => { + CoreEventType::ChatEphemeralTimerModified { chat_id, timer } => { ChatEphemeralTimerModified { chat_id: chat_id.to_u32(), timer: timer.to_u32(), } } - EventType::ContactsChanged(contact) => ContactsChanged { + CoreEventType::ContactsChanged(contact) => ContactsChanged { contact_id: contact.map(|c| c.to_u32()), }, - EventType::LocationChanged(contact) => LocationChanged { + CoreEventType::LocationChanged(contact) => LocationChanged { contact_id: contact.map(|c| c.to_u32()), }, - EventType::ConfigureProgress { progress, comment } => { + CoreEventType::ConfigureProgress { progress, comment } => { ConfigureProgress { progress, comment } } - EventType::ImexProgress(progress) => ImexProgress { progress }, - EventType::ImexFileWritten(path) => ImexFileWritten { + CoreEventType::ImexProgress(progress) => ImexProgress { progress }, + CoreEventType::ImexFileWritten(path) => ImexFileWritten { path: path.to_str().unwrap_or_default().to_owned(), }, - EventType::SecurejoinInviterProgress { + CoreEventType::SecurejoinInviterProgress { contact_id, progress, } => SecurejoinInviterProgress { contact_id: contact_id.to_u32(), progress, }, - EventType::SecurejoinJoinerProgress { + CoreEventType::SecurejoinJoinerProgress { contact_id, progress, } => SecurejoinJoinerProgress { contact_id: contact_id.to_u32(), progress, }, - EventType::ConnectivityChanged => ConnectivityChanged, - EventType::SelfavatarChanged => SelfavatarChanged, - EventType::WebxdcStatusUpdate { + CoreEventType::ConnectivityChanged => ConnectivityChanged, + CoreEventType::SelfavatarChanged => SelfavatarChanged, + CoreEventType::WebxdcStatusUpdate { msg_id, status_update_serial, } => WebxdcStatusUpdate { msg_id: msg_id.to_u32(), status_update_serial: status_update_serial.to_u32(), }, - EventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted { + CoreEventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted { msg_id: msg_id.to_u32(), }, } } } - -#[cfg(test)] -#[test] -fn generate_events_ts_types_definition() { - let events = { - let mut buf = Vec::new(); - let options = typescript_type_def::DefinitionFileOptions { - root_namespace: None, - ..typescript_type_def::DefinitionFileOptions::default() - }; - typescript_type_def::write_definition_file::<_, JSONRPCEventType>(&mut buf, options) - .unwrap(); - String::from_utf8(buf).unwrap() - }; - std::fs::write("typescript/generated/events.ts", events).unwrap(); -} diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index 41dff098b..19a1ef109 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -49,6 +49,7 @@ use types::message::MessageObject; use types::provider_info::ProviderInfo; use types::webxdc::WebxdcMessageInfo; +use self::events::Event; use self::types::message::MessageLoadResult; use self::types::{ chat::{BasicChat, JSONRPCChatVisibility, MuteDuration}, @@ -165,6 +166,16 @@ impl CommandApi { get_info() } + /// Get the next event. + async fn get_next_event(&self) -> Result { + let event_emitter = self.accounts.read().await.get_event_emitter(); + event_emitter + .recv() + .await + .map(|event| event.into()) + .context("event channel is closed") + } + // --------------------------------------------- // Account Management // --------------------------------------------- diff --git a/deltachat-jsonrpc/src/webserver.rs b/deltachat-jsonrpc/src/webserver.rs index 9231069c5..df8f92135 100644 --- a/deltachat-jsonrpc/src/webserver.rs +++ b/deltachat-jsonrpc/src/webserver.rs @@ -6,7 +6,6 @@ use yerpc::axum::handle_ws_rpc; use yerpc::{RpcClient, RpcSession}; mod api; -use api::events::event_to_json_rpc_notification; use api::{Accounts, CommandApi}; const DEFAULT_PORT: u16 = 20808; @@ -44,12 +43,5 @@ async fn main() -> Result<(), std::io::Error> { async fn handler(ws: WebSocketUpgrade, Extension(api): Extension) -> Response { let (client, out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), api.clone()); - tokio::spawn(async move { - let events = api.accounts.read().await.get_event_emitter(); - while let Some(event) = events.recv().await { - let event = event_to_json_rpc_notification(event); - client.send_notification("event", Some(event)).await.ok(); - } - }); handle_ws_rpc(ws, out_receiver, session).await } diff --git a/deltachat-jsonrpc/typescript/src/client.ts b/deltachat-jsonrpc/typescript/src/client.ts index 9efbab964..783ed7bad 100644 --- a/deltachat-jsonrpc/typescript/src/client.ts +++ b/deltachat-jsonrpc/typescript/src/client.ts @@ -1,7 +1,6 @@ import * as T from "../generated/types.js"; import * as RPC from "../generated/jsonrpc.js"; import { RawClient } from "../generated/client.js"; -import { Event } from "../generated/events.js"; import { WebsocketTransport, BaseTransport, Request } from "yerpc"; import { TinyEmitter } from "@deltachat/tiny-emitter"; @@ -36,27 +35,30 @@ export class BaseDeltaChat< rpc: RawClient; account?: T.Account; private contextEmitters: { [key: number]: TinyEmitter } = {}; + + private eventTask: Promise; + constructor(public transport: Transport) { super(); this.rpc = new RawClient(this.transport); - this.transport.on("request", (request: Request) => { - const method = request.method; - if (method === "event") { - const event = request.params! as DCWireEvent; - //@ts-ignore - this.emit(event.event.type, event.contextId, event.event as any); - this.emit("ALL", event.contextId, event.event as any); + this.eventTask = this.eventLoop(); + } - if (this.contextEmitters[event.contextId]) { - this.contextEmitters[event.contextId].emit( - event.event.type, - //@ts-ignore - event.event as any - ); - this.contextEmitters[event.contextId].emit("ALL", event.event); - } + async eventLoop(): Promise { + while (true) { + const event = await this.rpc.getNextEvent(); + this.emit(event.event.type, event.context_id, event.event as any); + this.emit("ALL", event.context_id, event.event as any); + + if (this.contextEmitters[event.context_id]) { + this.contextEmitters[event.context_id].emit( + event.event.type, + //@ts-ignore + event.event as any + ); + this.contextEmitters[event.context_id].emit("ALL", event.event as any); } - }); + } } async listAccounts(): Promise { diff --git a/deltachat-jsonrpc/typescript/src/lib.ts b/deltachat-jsonrpc/typescript/src/lib.ts index 473d2bd33..de357a1ea 100644 --- a/deltachat-jsonrpc/typescript/src/lib.ts +++ b/deltachat-jsonrpc/typescript/src/lib.ts @@ -1,6 +1,5 @@ export * as RPC from "../generated/jsonrpc.js"; export * as T from "../generated/types.js"; -export * from "../generated/events.js"; export { RawClient } from "../generated/client.js"; export * from "./client.js"; export * as yerpc from "yerpc"; diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index f15c1a29a..57e26dd0b 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -23,7 +23,9 @@ class Rpc: self.event_queues: Dict[int, asyncio.Queue] # Map from request ID to `asyncio.Future` returning the response. self.request_events: Dict[int, asyncio.Future] + self.closing: bool self.reader_task: asyncio.Task + self.events_task: asyncio.Task async def start(self) -> None: self.process = await asyncio.create_subprocess_exec( @@ -35,10 +37,15 @@ class Rpc: self.id = 0 self.event_queues = {} self.request_events = {} + self.closing = False self.reader_task = asyncio.create_task(self.reader_loop()) + self.events_task = asyncio.create_task(self.events_loop()) async def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" + self.closing = True + await self.stop_io_for_all_accounts() + await self.events_task self.process.terminate() await self.reader_task @@ -58,21 +65,28 @@ class Rpc: if "id" in response: fut = self.request_events.pop(response["id"]) fut.set_result(response) - elif response["method"] == "event": - # An event notification. - params = response["params"] - account_id = params["contextId"] - if account_id not in self.event_queues: - self.event_queues[account_id] = asyncio.Queue() - await self.event_queues[account_id].put(params["event"]) else: print(response) + async def get_queue(self, account_id: int) -> asyncio.Queue: + if account_id not in self.event_queues: + self.event_queues[account_id] = asyncio.Queue() + return self.event_queues[account_id] + + async def events_loop(self) -> None: + """Requests new events and distributes them between queues.""" + while True: + if self.closing: + return + event = await self.get_next_event() + account_id = event["context_id"] + queue = await self.get_queue(account_id) + await queue.put(event["event"]) + async def wait_for_event(self, account_id: int) -> Optional[dict]: """Waits for the next event from the given account and returns it.""" - if account_id in self.event_queues: - return await self.event_queues[account_id].get() - return None + queue = await self.get_queue(account_id) + return await queue.get() def __getattr__(self, attr: str): async def method(*args, **kwargs) -> Any: diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 419f4deb4..a635dddb5 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context as _, Result}; use deltachat::constants::DC_VERSION_STR; -use deltachat_jsonrpc::api::events::event_to_json_rpc_notification; use deltachat_jsonrpc::api::{Accounts, CommandApi}; use futures_lite::stream::StreamExt; use tokio::io::{self, AsyncBufReadExt, BufReader}; @@ -58,7 +57,6 @@ async fn main_impl() -> Result<()> { let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); log::info!("Starting with accounts directory `{}`.", path); let accounts = Accounts::new(PathBuf::from(&path)).await?; - let events = accounts.get_event_emitter(); log::info!("Creating JSON-RPC API."); let accounts = Arc::new(RwLock::new(accounts)); @@ -68,21 +66,6 @@ async fn main_impl() -> Result<()> { let session = RpcSession::new(client.clone(), state.clone()); let main_cancel = CancellationToken::new(); - // Events task converts core events to JSON-RPC notifications. - let cancel = main_cancel.clone(); - let events_task: JoinHandle> = tokio::spawn(async move { - let _cancel_guard = cancel.clone().drop_guard(); - let mut r = Ok(()); - while let Some(event) = events.recv().await { - if r.is_err() { - continue; - } - let event = event_to_json_rpc_notification(event); - r = client.send_notification("event", Some(event)).await; - } - Ok(()) - }); - // Send task prints JSON responses to stdout. let cancel = main_cancel.clone(); let send_task: JoinHandle> = tokio::spawn(async move { @@ -148,16 +131,13 @@ async fn main_impl() -> Result<()> { Ok(()) }); - // See "Thread safety" section in deltachat-ffi/deltachat.h for explanation. - // NB: Events are drained by events_task. main_cancel.cancelled().await; accounts.read().await.stop_io().await; drop(accounts); drop(state); - let (r0, r1, r2, r3) = tokio::join!(events_task, send_task, sigterm_task, recv_task); - for r in [r0, r1, r2, r3] { - r??; - } + send_task.await??; + sigterm_task.await??; + recv_task.await??; Ok(()) }