diff --git a/CHANGELOG.md b/CHANGELOG.md index dc145bb4a..d55022256 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## Unreleased +### Changes +- JSON-RPC: Use long polling instead of server-sent notifications to retrieve events. + This better corresponds to JSON-RPC 2.0 server-client distinction + and is expected to simplify writing new bindings + because dispatching events can be done on higher level. + ### Fixes - JSON-RPC: do not print to stdout on failure to find an account. diff --git a/deltachat-ffi/src/lib.rs b/deltachat-ffi/src/lib.rs index c567ac50d..ebbe0d119 100644 --- a/deltachat-ffi/src/lib.rs +++ b/deltachat-ffi/src/lib.rs @@ -4967,7 +4967,6 @@ pub unsafe extern "C" fn dc_accounts_get_event_emitter( #[cfg(feature = "jsonrpc")] mod jsonrpc { use deltachat_jsonrpc::api::CommandApi; - use deltachat_jsonrpc::events::event_to_json_rpc_notification; use deltachat_jsonrpc::yerpc::{OutReceiver, RpcClient, RpcSession}; use super::*; @@ -4975,7 +4974,6 @@ mod jsonrpc { pub struct dc_jsonrpc_instance_t { receiver: OutReceiver, handle: RpcSession, - event_thread: JoinHandle>, } #[no_mangle] @@ -4988,28 +4986,12 @@ mod jsonrpc { } let account_manager = &*account_manager; - let events = block_on(account_manager.read()).get_event_emitter(); let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone()); let (request_handle, receiver) = RpcClient::new(); - let handle = RpcSession::new(request_handle.clone(), cmd_api); + let handle = RpcSession::new(request_handle, cmd_api); - let event_thread = spawn(async move { - while let Some(event) = events.recv().await { - let event = event_to_json_rpc_notification(event); - request_handle - .send_notification("event", Some(event)) - .await?; - } - let res: Result<(), anyhow::Error> = Ok(()); - res - }); - - let instance = dc_jsonrpc_instance_t { - receiver, - handle, - event_thread, - }; + let instance = dc_jsonrpc_instance_t { receiver, handle }; Box::into_raw(Box::new(instance)) } @@ -5020,7 +5002,6 @@ mod jsonrpc { eprintln!("ignoring careless call to dc_jsonrpc_unref()"); return; } - (*jsonrpc_instance).event_thread.abort(); drop(Box::from_raw(jsonrpc_instance)); } diff --git a/deltachat-jsonrpc/src/api/events.rs b/deltachat-jsonrpc/src/api/events.rs index ddbbcebca..9c36e5b53 100644 --- a/deltachat-jsonrpc/src/api/events.rs +++ b/deltachat-jsonrpc/src/api/events.rs @@ -1,19 +1,28 @@ -use deltachat::{Event, EventType}; +use deltachat::{Event as CoreEvent, EventType as CoreEventType}; use serde::Serialize; -use serde_json::{json, Value}; use typescript_type_def::TypeDef; -pub fn event_to_json_rpc_notification(event: Event) -> Value { - let id: JSONRPCEventType = event.typ.into(); - json!({ - "event": id, - "contextId": event.id, - }) +#[derive(Serialize, TypeDef)] +pub struct Event { + /// Event payload. + event: EventType, + + /// Account ID. + context_id: u32, +} + +impl From for Event { + fn from(event: CoreEvent) -> Self { + Event { + event: event.typ.into(), + context_id: event.id, + } + } } #[derive(Serialize, TypeDef)] -#[serde(tag = "type", rename = "Event")] -pub enum JSONRPCEventType { +#[serde(tag = "type")] +pub enum EventType { /// The library-user may write an informational string to the log. /// /// This event should *not* be reported to the end-user using a popup or something like @@ -286,27 +295,27 @@ pub enum JSONRPCEventType { }, } -impl From for JSONRPCEventType { - fn from(event: EventType) -> Self { - use JSONRPCEventType::*; +impl From for EventType { + fn from(event: CoreEventType) -> Self { + use EventType::*; match event { - EventType::Info(msg) => Info { msg }, - EventType::SmtpConnected(msg) => SmtpConnected { msg }, - EventType::ImapConnected(msg) => ImapConnected { msg }, - EventType::SmtpMessageSent(msg) => SmtpMessageSent { msg }, - EventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg }, - EventType::ImapMessageMoved(msg) => ImapMessageMoved { msg }, - EventType::ImapInboxIdle => ImapInboxIdle, - EventType::NewBlobFile(file) => NewBlobFile { file }, - EventType::DeletedBlobFile(file) => DeletedBlobFile { file }, - EventType::Warning(msg) => Warning { msg }, - EventType::Error(msg) => Error { msg }, - EventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg }, - EventType::MsgsChanged { chat_id, msg_id } => MsgsChanged { + CoreEventType::Info(msg) => Info { msg }, + CoreEventType::SmtpConnected(msg) => SmtpConnected { msg }, + CoreEventType::ImapConnected(msg) => ImapConnected { msg }, + CoreEventType::SmtpMessageSent(msg) => SmtpMessageSent { msg }, + CoreEventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg }, + CoreEventType::ImapMessageMoved(msg) => ImapMessageMoved { msg }, + CoreEventType::ImapInboxIdle => ImapInboxIdle, + CoreEventType::NewBlobFile(file) => NewBlobFile { file }, + CoreEventType::DeletedBlobFile(file) => DeletedBlobFile { file }, + CoreEventType::Warning(msg) => Warning { msg }, + CoreEventType::Error(msg) => Error { msg }, + CoreEventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg }, + CoreEventType::MsgsChanged { chat_id, msg_id } => MsgsChanged { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::ReactionsChanged { + CoreEventType::ReactionsChanged { chat_id, msg_id, contact_id, @@ -315,92 +324,76 @@ impl From for JSONRPCEventType { msg_id: msg_id.to_u32(), contact_id: contact_id.to_u32(), }, - EventType::IncomingMsg { chat_id, msg_id } => IncomingMsg { + CoreEventType::IncomingMsg { chat_id, msg_id } => IncomingMsg { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch { + CoreEventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch { msg_ids: msg_ids.into_iter().map(|id| id.to_u32()).collect(), }, - EventType::MsgsNoticed(chat_id) => MsgsNoticed { + CoreEventType::MsgsNoticed(chat_id) => MsgsNoticed { chat_id: chat_id.to_u32(), }, - EventType::MsgDelivered { chat_id, msg_id } => MsgDelivered { + CoreEventType::MsgDelivered { chat_id, msg_id } => MsgDelivered { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::MsgFailed { chat_id, msg_id } => MsgFailed { + CoreEventType::MsgFailed { chat_id, msg_id } => MsgFailed { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::MsgRead { chat_id, msg_id } => MsgRead { + CoreEventType::MsgRead { chat_id, msg_id } => MsgRead { chat_id: chat_id.to_u32(), msg_id: msg_id.to_u32(), }, - EventType::ChatModified(chat_id) => ChatModified { + CoreEventType::ChatModified(chat_id) => ChatModified { chat_id: chat_id.to_u32(), }, - EventType::ChatEphemeralTimerModified { chat_id, timer } => { + CoreEventType::ChatEphemeralTimerModified { chat_id, timer } => { ChatEphemeralTimerModified { chat_id: chat_id.to_u32(), timer: timer.to_u32(), } } - EventType::ContactsChanged(contact) => ContactsChanged { + CoreEventType::ContactsChanged(contact) => ContactsChanged { contact_id: contact.map(|c| c.to_u32()), }, - EventType::LocationChanged(contact) => LocationChanged { + CoreEventType::LocationChanged(contact) => LocationChanged { contact_id: contact.map(|c| c.to_u32()), }, - EventType::ConfigureProgress { progress, comment } => { + CoreEventType::ConfigureProgress { progress, comment } => { ConfigureProgress { progress, comment } } - EventType::ImexProgress(progress) => ImexProgress { progress }, - EventType::ImexFileWritten(path) => ImexFileWritten { + CoreEventType::ImexProgress(progress) => ImexProgress { progress }, + CoreEventType::ImexFileWritten(path) => ImexFileWritten { path: path.to_str().unwrap_or_default().to_owned(), }, - EventType::SecurejoinInviterProgress { + CoreEventType::SecurejoinInviterProgress { contact_id, progress, } => SecurejoinInviterProgress { contact_id: contact_id.to_u32(), progress, }, - EventType::SecurejoinJoinerProgress { + CoreEventType::SecurejoinJoinerProgress { contact_id, progress, } => SecurejoinJoinerProgress { contact_id: contact_id.to_u32(), progress, }, - EventType::ConnectivityChanged => ConnectivityChanged, - EventType::SelfavatarChanged => SelfavatarChanged, - EventType::WebxdcStatusUpdate { + CoreEventType::ConnectivityChanged => ConnectivityChanged, + CoreEventType::SelfavatarChanged => SelfavatarChanged, + CoreEventType::WebxdcStatusUpdate { msg_id, status_update_serial, } => WebxdcStatusUpdate { msg_id: msg_id.to_u32(), status_update_serial: status_update_serial.to_u32(), }, - EventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted { + CoreEventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted { msg_id: msg_id.to_u32(), }, } } } - -#[cfg(test)] -#[test] -fn generate_events_ts_types_definition() { - let events = { - let mut buf = Vec::new(); - let options = typescript_type_def::DefinitionFileOptions { - root_namespace: None, - ..typescript_type_def::DefinitionFileOptions::default() - }; - typescript_type_def::write_definition_file::<_, JSONRPCEventType>(&mut buf, options) - .unwrap(); - String::from_utf8(buf).unwrap() - }; - std::fs::write("typescript/generated/events.ts", events).unwrap(); -} diff --git a/deltachat-jsonrpc/src/api/mod.rs b/deltachat-jsonrpc/src/api/mod.rs index 41dff098b..19a1ef109 100644 --- a/deltachat-jsonrpc/src/api/mod.rs +++ b/deltachat-jsonrpc/src/api/mod.rs @@ -49,6 +49,7 @@ use types::message::MessageObject; use types::provider_info::ProviderInfo; use types::webxdc::WebxdcMessageInfo; +use self::events::Event; use self::types::message::MessageLoadResult; use self::types::{ chat::{BasicChat, JSONRPCChatVisibility, MuteDuration}, @@ -165,6 +166,16 @@ impl CommandApi { get_info() } + /// Get the next event. + async fn get_next_event(&self) -> Result { + let event_emitter = self.accounts.read().await.get_event_emitter(); + event_emitter + .recv() + .await + .map(|event| event.into()) + .context("event channel is closed") + } + // --------------------------------------------- // Account Management // --------------------------------------------- diff --git a/deltachat-jsonrpc/src/webserver.rs b/deltachat-jsonrpc/src/webserver.rs index 9231069c5..df8f92135 100644 --- a/deltachat-jsonrpc/src/webserver.rs +++ b/deltachat-jsonrpc/src/webserver.rs @@ -6,7 +6,6 @@ use yerpc::axum::handle_ws_rpc; use yerpc::{RpcClient, RpcSession}; mod api; -use api::events::event_to_json_rpc_notification; use api::{Accounts, CommandApi}; const DEFAULT_PORT: u16 = 20808; @@ -44,12 +43,5 @@ async fn main() -> Result<(), std::io::Error> { async fn handler(ws: WebSocketUpgrade, Extension(api): Extension) -> Response { let (client, out_receiver) = RpcClient::new(); let session = RpcSession::new(client.clone(), api.clone()); - tokio::spawn(async move { - let events = api.accounts.read().await.get_event_emitter(); - while let Some(event) = events.recv().await { - let event = event_to_json_rpc_notification(event); - client.send_notification("event", Some(event)).await.ok(); - } - }); handle_ws_rpc(ws, out_receiver, session).await } diff --git a/deltachat-jsonrpc/typescript/src/client.ts b/deltachat-jsonrpc/typescript/src/client.ts index 9efbab964..783ed7bad 100644 --- a/deltachat-jsonrpc/typescript/src/client.ts +++ b/deltachat-jsonrpc/typescript/src/client.ts @@ -1,7 +1,6 @@ import * as T from "../generated/types.js"; import * as RPC from "../generated/jsonrpc.js"; import { RawClient } from "../generated/client.js"; -import { Event } from "../generated/events.js"; import { WebsocketTransport, BaseTransport, Request } from "yerpc"; import { TinyEmitter } from "@deltachat/tiny-emitter"; @@ -36,27 +35,30 @@ export class BaseDeltaChat< rpc: RawClient; account?: T.Account; private contextEmitters: { [key: number]: TinyEmitter } = {}; + + private eventTask: Promise; + constructor(public transport: Transport) { super(); this.rpc = new RawClient(this.transport); - this.transport.on("request", (request: Request) => { - const method = request.method; - if (method === "event") { - const event = request.params! as DCWireEvent; - //@ts-ignore - this.emit(event.event.type, event.contextId, event.event as any); - this.emit("ALL", event.contextId, event.event as any); + this.eventTask = this.eventLoop(); + } - if (this.contextEmitters[event.contextId]) { - this.contextEmitters[event.contextId].emit( - event.event.type, - //@ts-ignore - event.event as any - ); - this.contextEmitters[event.contextId].emit("ALL", event.event); - } + async eventLoop(): Promise { + while (true) { + const event = await this.rpc.getNextEvent(); + this.emit(event.event.type, event.context_id, event.event as any); + this.emit("ALL", event.context_id, event.event as any); + + if (this.contextEmitters[event.context_id]) { + this.contextEmitters[event.context_id].emit( + event.event.type, + //@ts-ignore + event.event as any + ); + this.contextEmitters[event.context_id].emit("ALL", event.event as any); } - }); + } } async listAccounts(): Promise { diff --git a/deltachat-jsonrpc/typescript/src/lib.ts b/deltachat-jsonrpc/typescript/src/lib.ts index 473d2bd33..de357a1ea 100644 --- a/deltachat-jsonrpc/typescript/src/lib.ts +++ b/deltachat-jsonrpc/typescript/src/lib.ts @@ -1,6 +1,5 @@ export * as RPC from "../generated/jsonrpc.js"; export * as T from "../generated/types.js"; -export * from "../generated/events.js"; export { RawClient } from "../generated/client.js"; export * from "./client.js"; export * as yerpc from "yerpc"; diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index f15c1a29a..57e26dd0b 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -23,7 +23,9 @@ class Rpc: self.event_queues: Dict[int, asyncio.Queue] # Map from request ID to `asyncio.Future` returning the response. self.request_events: Dict[int, asyncio.Future] + self.closing: bool self.reader_task: asyncio.Task + self.events_task: asyncio.Task async def start(self) -> None: self.process = await asyncio.create_subprocess_exec( @@ -35,10 +37,15 @@ class Rpc: self.id = 0 self.event_queues = {} self.request_events = {} + self.closing = False self.reader_task = asyncio.create_task(self.reader_loop()) + self.events_task = asyncio.create_task(self.events_loop()) async def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" + self.closing = True + await self.stop_io_for_all_accounts() + await self.events_task self.process.terminate() await self.reader_task @@ -58,21 +65,28 @@ class Rpc: if "id" in response: fut = self.request_events.pop(response["id"]) fut.set_result(response) - elif response["method"] == "event": - # An event notification. - params = response["params"] - account_id = params["contextId"] - if account_id not in self.event_queues: - self.event_queues[account_id] = asyncio.Queue() - await self.event_queues[account_id].put(params["event"]) else: print(response) + async def get_queue(self, account_id: int) -> asyncio.Queue: + if account_id not in self.event_queues: + self.event_queues[account_id] = asyncio.Queue() + return self.event_queues[account_id] + + async def events_loop(self) -> None: + """Requests new events and distributes them between queues.""" + while True: + if self.closing: + return + event = await self.get_next_event() + account_id = event["context_id"] + queue = await self.get_queue(account_id) + await queue.put(event["event"]) + async def wait_for_event(self, account_id: int) -> Optional[dict]: """Waits for the next event from the given account and returns it.""" - if account_id in self.event_queues: - return await self.event_queues[account_id].get() - return None + queue = await self.get_queue(account_id) + return await queue.get() def __getattr__(self, attr: str): async def method(*args, **kwargs) -> Any: diff --git a/deltachat-rpc-server/src/main.rs b/deltachat-rpc-server/src/main.rs index 419f4deb4..a635dddb5 100644 --- a/deltachat-rpc-server/src/main.rs +++ b/deltachat-rpc-server/src/main.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use anyhow::{anyhow, Context as _, Result}; use deltachat::constants::DC_VERSION_STR; -use deltachat_jsonrpc::api::events::event_to_json_rpc_notification; use deltachat_jsonrpc::api::{Accounts, CommandApi}; use futures_lite::stream::StreamExt; use tokio::io::{self, AsyncBufReadExt, BufReader}; @@ -58,7 +57,6 @@ async fn main_impl() -> Result<()> { let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); log::info!("Starting with accounts directory `{}`.", path); let accounts = Accounts::new(PathBuf::from(&path)).await?; - let events = accounts.get_event_emitter(); log::info!("Creating JSON-RPC API."); let accounts = Arc::new(RwLock::new(accounts)); @@ -68,21 +66,6 @@ async fn main_impl() -> Result<()> { let session = RpcSession::new(client.clone(), state.clone()); let main_cancel = CancellationToken::new(); - // Events task converts core events to JSON-RPC notifications. - let cancel = main_cancel.clone(); - let events_task: JoinHandle> = tokio::spawn(async move { - let _cancel_guard = cancel.clone().drop_guard(); - let mut r = Ok(()); - while let Some(event) = events.recv().await { - if r.is_err() { - continue; - } - let event = event_to_json_rpc_notification(event); - r = client.send_notification("event", Some(event)).await; - } - Ok(()) - }); - // Send task prints JSON responses to stdout. let cancel = main_cancel.clone(); let send_task: JoinHandle> = tokio::spawn(async move { @@ -148,16 +131,13 @@ async fn main_impl() -> Result<()> { Ok(()) }); - // See "Thread safety" section in deltachat-ffi/deltachat.h for explanation. - // NB: Events are drained by events_task. main_cancel.cancelled().await; accounts.read().await.stop_io().await; drop(accounts); drop(state); - let (r0, r1, r2, r3) = tokio::join!(events_task, send_task, sigterm_task, recv_task); - for r in [r0, r1, r2, r3] { - r??; - } + send_task.await??; + sigterm_task.await??; + recv_task.await??; Ok(()) }