mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 09:26:29 +03:00
JSON-RPC: retrieve events via long polling
This way is more compatible to JSON-RPC libraries that do not support receiving notifications from the server and allows describing event types in the OpenRPC specification. Event thread converting events to notifications in the FFI is removed, so it is now possible to construct a dc_jsonrpc_instance_t while still retrieving events via dc_event_emitter_t.
This commit is contained in:
@@ -2,6 +2,12 @@
|
|||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
### Changes
|
||||||
|
- JSON-RPC: Use long polling instead of server-sent notifications to retrieve events.
|
||||||
|
This better corresponds to JSON-RPC 2.0 server-client distinction
|
||||||
|
and is expected to simplify writing new bindings
|
||||||
|
because dispatching events can be done on higher level.
|
||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- JSON-RPC: do not print to stdout on failure to find an account.
|
- JSON-RPC: do not print to stdout on failure to find an account.
|
||||||
|
|
||||||
|
|||||||
@@ -4967,7 +4967,6 @@ pub unsafe extern "C" fn dc_accounts_get_event_emitter(
|
|||||||
#[cfg(feature = "jsonrpc")]
|
#[cfg(feature = "jsonrpc")]
|
||||||
mod jsonrpc {
|
mod jsonrpc {
|
||||||
use deltachat_jsonrpc::api::CommandApi;
|
use deltachat_jsonrpc::api::CommandApi;
|
||||||
use deltachat_jsonrpc::events::event_to_json_rpc_notification;
|
|
||||||
use deltachat_jsonrpc::yerpc::{OutReceiver, RpcClient, RpcSession};
|
use deltachat_jsonrpc::yerpc::{OutReceiver, RpcClient, RpcSession};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -4975,7 +4974,6 @@ mod jsonrpc {
|
|||||||
pub struct dc_jsonrpc_instance_t {
|
pub struct dc_jsonrpc_instance_t {
|
||||||
receiver: OutReceiver,
|
receiver: OutReceiver,
|
||||||
handle: RpcSession<CommandApi>,
|
handle: RpcSession<CommandApi>,
|
||||||
event_thread: JoinHandle<Result<(), anyhow::Error>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[no_mangle]
|
#[no_mangle]
|
||||||
@@ -4988,28 +4986,12 @@ mod jsonrpc {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let account_manager = &*account_manager;
|
let account_manager = &*account_manager;
|
||||||
let events = block_on(account_manager.read()).get_event_emitter();
|
|
||||||
let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone());
|
let cmd_api = deltachat_jsonrpc::api::CommandApi::from_arc(account_manager.inner.clone());
|
||||||
|
|
||||||
let (request_handle, receiver) = RpcClient::new();
|
let (request_handle, receiver) = RpcClient::new();
|
||||||
let handle = RpcSession::new(request_handle.clone(), cmd_api);
|
let handle = RpcSession::new(request_handle, cmd_api);
|
||||||
|
|
||||||
let event_thread = spawn(async move {
|
let instance = dc_jsonrpc_instance_t { receiver, handle };
|
||||||
while let Some(event) = events.recv().await {
|
|
||||||
let event = event_to_json_rpc_notification(event);
|
|
||||||
request_handle
|
|
||||||
.send_notification("event", Some(event))
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
let res: Result<(), anyhow::Error> = Ok(());
|
|
||||||
res
|
|
||||||
});
|
|
||||||
|
|
||||||
let instance = dc_jsonrpc_instance_t {
|
|
||||||
receiver,
|
|
||||||
handle,
|
|
||||||
event_thread,
|
|
||||||
};
|
|
||||||
|
|
||||||
Box::into_raw(Box::new(instance))
|
Box::into_raw(Box::new(instance))
|
||||||
}
|
}
|
||||||
@@ -5020,7 +5002,6 @@ mod jsonrpc {
|
|||||||
eprintln!("ignoring careless call to dc_jsonrpc_unref()");
|
eprintln!("ignoring careless call to dc_jsonrpc_unref()");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
(*jsonrpc_instance).event_thread.abort();
|
|
||||||
drop(Box::from_raw(jsonrpc_instance));
|
drop(Box::from_raw(jsonrpc_instance));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,19 +1,28 @@
|
|||||||
use deltachat::{Event, EventType};
|
use deltachat::{Event as CoreEvent, EventType as CoreEventType};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::{json, Value};
|
|
||||||
use typescript_type_def::TypeDef;
|
use typescript_type_def::TypeDef;
|
||||||
|
|
||||||
pub fn event_to_json_rpc_notification(event: Event) -> Value {
|
#[derive(Serialize, TypeDef)]
|
||||||
let id: JSONRPCEventType = event.typ.into();
|
pub struct Event {
|
||||||
json!({
|
/// Event payload.
|
||||||
"event": id,
|
event: EventType,
|
||||||
"contextId": event.id,
|
|
||||||
})
|
/// Account ID.
|
||||||
|
context_id: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<CoreEvent> for Event {
|
||||||
|
fn from(event: CoreEvent) -> Self {
|
||||||
|
Event {
|
||||||
|
event: event.typ.into(),
|
||||||
|
context_id: event.id,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, TypeDef)]
|
#[derive(Serialize, TypeDef)]
|
||||||
#[serde(tag = "type", rename = "Event")]
|
#[serde(tag = "type")]
|
||||||
pub enum JSONRPCEventType {
|
pub enum EventType {
|
||||||
/// The library-user may write an informational string to the log.
|
/// The library-user may write an informational string to the log.
|
||||||
///
|
///
|
||||||
/// This event should *not* be reported to the end-user using a popup or something like
|
/// This event should *not* be reported to the end-user using a popup or something like
|
||||||
@@ -286,27 +295,27 @@ pub enum JSONRPCEventType {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<EventType> for JSONRPCEventType {
|
impl From<CoreEventType> for EventType {
|
||||||
fn from(event: EventType) -> Self {
|
fn from(event: CoreEventType) -> Self {
|
||||||
use JSONRPCEventType::*;
|
use EventType::*;
|
||||||
match event {
|
match event {
|
||||||
EventType::Info(msg) => Info { msg },
|
CoreEventType::Info(msg) => Info { msg },
|
||||||
EventType::SmtpConnected(msg) => SmtpConnected { msg },
|
CoreEventType::SmtpConnected(msg) => SmtpConnected { msg },
|
||||||
EventType::ImapConnected(msg) => ImapConnected { msg },
|
CoreEventType::ImapConnected(msg) => ImapConnected { msg },
|
||||||
EventType::SmtpMessageSent(msg) => SmtpMessageSent { msg },
|
CoreEventType::SmtpMessageSent(msg) => SmtpMessageSent { msg },
|
||||||
EventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg },
|
CoreEventType::ImapMessageDeleted(msg) => ImapMessageDeleted { msg },
|
||||||
EventType::ImapMessageMoved(msg) => ImapMessageMoved { msg },
|
CoreEventType::ImapMessageMoved(msg) => ImapMessageMoved { msg },
|
||||||
EventType::ImapInboxIdle => ImapInboxIdle,
|
CoreEventType::ImapInboxIdle => ImapInboxIdle,
|
||||||
EventType::NewBlobFile(file) => NewBlobFile { file },
|
CoreEventType::NewBlobFile(file) => NewBlobFile { file },
|
||||||
EventType::DeletedBlobFile(file) => DeletedBlobFile { file },
|
CoreEventType::DeletedBlobFile(file) => DeletedBlobFile { file },
|
||||||
EventType::Warning(msg) => Warning { msg },
|
CoreEventType::Warning(msg) => Warning { msg },
|
||||||
EventType::Error(msg) => Error { msg },
|
CoreEventType::Error(msg) => Error { msg },
|
||||||
EventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg },
|
CoreEventType::ErrorSelfNotInGroup(msg) => ErrorSelfNotInGroup { msg },
|
||||||
EventType::MsgsChanged { chat_id, msg_id } => MsgsChanged {
|
CoreEventType::MsgsChanged { chat_id, msg_id } => MsgsChanged {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::ReactionsChanged {
|
CoreEventType::ReactionsChanged {
|
||||||
chat_id,
|
chat_id,
|
||||||
msg_id,
|
msg_id,
|
||||||
contact_id,
|
contact_id,
|
||||||
@@ -315,92 +324,76 @@ impl From<EventType> for JSONRPCEventType {
|
|||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
contact_id: contact_id.to_u32(),
|
contact_id: contact_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::IncomingMsg { chat_id, msg_id } => IncomingMsg {
|
CoreEventType::IncomingMsg { chat_id, msg_id } => IncomingMsg {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch {
|
CoreEventType::IncomingMsgBunch { msg_ids } => IncomingMsgBunch {
|
||||||
msg_ids: msg_ids.into_iter().map(|id| id.to_u32()).collect(),
|
msg_ids: msg_ids.into_iter().map(|id| id.to_u32()).collect(),
|
||||||
},
|
},
|
||||||
EventType::MsgsNoticed(chat_id) => MsgsNoticed {
|
CoreEventType::MsgsNoticed(chat_id) => MsgsNoticed {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::MsgDelivered { chat_id, msg_id } => MsgDelivered {
|
CoreEventType::MsgDelivered { chat_id, msg_id } => MsgDelivered {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::MsgFailed { chat_id, msg_id } => MsgFailed {
|
CoreEventType::MsgFailed { chat_id, msg_id } => MsgFailed {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::MsgRead { chat_id, msg_id } => MsgRead {
|
CoreEventType::MsgRead { chat_id, msg_id } => MsgRead {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::ChatModified(chat_id) => ChatModified {
|
CoreEventType::ChatModified(chat_id) => ChatModified {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::ChatEphemeralTimerModified { chat_id, timer } => {
|
CoreEventType::ChatEphemeralTimerModified { chat_id, timer } => {
|
||||||
ChatEphemeralTimerModified {
|
ChatEphemeralTimerModified {
|
||||||
chat_id: chat_id.to_u32(),
|
chat_id: chat_id.to_u32(),
|
||||||
timer: timer.to_u32(),
|
timer: timer.to_u32(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EventType::ContactsChanged(contact) => ContactsChanged {
|
CoreEventType::ContactsChanged(contact) => ContactsChanged {
|
||||||
contact_id: contact.map(|c| c.to_u32()),
|
contact_id: contact.map(|c| c.to_u32()),
|
||||||
},
|
},
|
||||||
EventType::LocationChanged(contact) => LocationChanged {
|
CoreEventType::LocationChanged(contact) => LocationChanged {
|
||||||
contact_id: contact.map(|c| c.to_u32()),
|
contact_id: contact.map(|c| c.to_u32()),
|
||||||
},
|
},
|
||||||
EventType::ConfigureProgress { progress, comment } => {
|
CoreEventType::ConfigureProgress { progress, comment } => {
|
||||||
ConfigureProgress { progress, comment }
|
ConfigureProgress { progress, comment }
|
||||||
}
|
}
|
||||||
EventType::ImexProgress(progress) => ImexProgress { progress },
|
CoreEventType::ImexProgress(progress) => ImexProgress { progress },
|
||||||
EventType::ImexFileWritten(path) => ImexFileWritten {
|
CoreEventType::ImexFileWritten(path) => ImexFileWritten {
|
||||||
path: path.to_str().unwrap_or_default().to_owned(),
|
path: path.to_str().unwrap_or_default().to_owned(),
|
||||||
},
|
},
|
||||||
EventType::SecurejoinInviterProgress {
|
CoreEventType::SecurejoinInviterProgress {
|
||||||
contact_id,
|
contact_id,
|
||||||
progress,
|
progress,
|
||||||
} => SecurejoinInviterProgress {
|
} => SecurejoinInviterProgress {
|
||||||
contact_id: contact_id.to_u32(),
|
contact_id: contact_id.to_u32(),
|
||||||
progress,
|
progress,
|
||||||
},
|
},
|
||||||
EventType::SecurejoinJoinerProgress {
|
CoreEventType::SecurejoinJoinerProgress {
|
||||||
contact_id,
|
contact_id,
|
||||||
progress,
|
progress,
|
||||||
} => SecurejoinJoinerProgress {
|
} => SecurejoinJoinerProgress {
|
||||||
contact_id: contact_id.to_u32(),
|
contact_id: contact_id.to_u32(),
|
||||||
progress,
|
progress,
|
||||||
},
|
},
|
||||||
EventType::ConnectivityChanged => ConnectivityChanged,
|
CoreEventType::ConnectivityChanged => ConnectivityChanged,
|
||||||
EventType::SelfavatarChanged => SelfavatarChanged,
|
CoreEventType::SelfavatarChanged => SelfavatarChanged,
|
||||||
EventType::WebxdcStatusUpdate {
|
CoreEventType::WebxdcStatusUpdate {
|
||||||
msg_id,
|
msg_id,
|
||||||
status_update_serial,
|
status_update_serial,
|
||||||
} => WebxdcStatusUpdate {
|
} => WebxdcStatusUpdate {
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
status_update_serial: status_update_serial.to_u32(),
|
status_update_serial: status_update_serial.to_u32(),
|
||||||
},
|
},
|
||||||
EventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
|
CoreEventType::WebxdcInstanceDeleted { msg_id } => WebxdcInstanceDeleted {
|
||||||
msg_id: msg_id.to_u32(),
|
msg_id: msg_id.to_u32(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
#[test]
|
|
||||||
fn generate_events_ts_types_definition() {
|
|
||||||
let events = {
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
let options = typescript_type_def::DefinitionFileOptions {
|
|
||||||
root_namespace: None,
|
|
||||||
..typescript_type_def::DefinitionFileOptions::default()
|
|
||||||
};
|
|
||||||
typescript_type_def::write_definition_file::<_, JSONRPCEventType>(&mut buf, options)
|
|
||||||
.unwrap();
|
|
||||||
String::from_utf8(buf).unwrap()
|
|
||||||
};
|
|
||||||
std::fs::write("typescript/generated/events.ts", events).unwrap();
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -49,6 +49,7 @@ use types::message::MessageObject;
|
|||||||
use types::provider_info::ProviderInfo;
|
use types::provider_info::ProviderInfo;
|
||||||
use types::webxdc::WebxdcMessageInfo;
|
use types::webxdc::WebxdcMessageInfo;
|
||||||
|
|
||||||
|
use self::events::Event;
|
||||||
use self::types::message::MessageLoadResult;
|
use self::types::message::MessageLoadResult;
|
||||||
use self::types::{
|
use self::types::{
|
||||||
chat::{BasicChat, JSONRPCChatVisibility, MuteDuration},
|
chat::{BasicChat, JSONRPCChatVisibility, MuteDuration},
|
||||||
@@ -165,6 +166,16 @@ impl CommandApi {
|
|||||||
get_info()
|
get_info()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the next event.
|
||||||
|
async fn get_next_event(&self) -> Result<Event> {
|
||||||
|
let event_emitter = self.accounts.read().await.get_event_emitter();
|
||||||
|
event_emitter
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.map(|event| event.into())
|
||||||
|
.context("event channel is closed")
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
// Account Management
|
// Account Management
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ use yerpc::axum::handle_ws_rpc;
|
|||||||
use yerpc::{RpcClient, RpcSession};
|
use yerpc::{RpcClient, RpcSession};
|
||||||
|
|
||||||
mod api;
|
mod api;
|
||||||
use api::events::event_to_json_rpc_notification;
|
|
||||||
use api::{Accounts, CommandApi};
|
use api::{Accounts, CommandApi};
|
||||||
|
|
||||||
const DEFAULT_PORT: u16 = 20808;
|
const DEFAULT_PORT: u16 = 20808;
|
||||||
@@ -44,12 +43,5 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
async fn handler(ws: WebSocketUpgrade, Extension(api): Extension<CommandApi>) -> Response {
|
async fn handler(ws: WebSocketUpgrade, Extension(api): Extension<CommandApi>) -> Response {
|
||||||
let (client, out_receiver) = RpcClient::new();
|
let (client, out_receiver) = RpcClient::new();
|
||||||
let session = RpcSession::new(client.clone(), api.clone());
|
let session = RpcSession::new(client.clone(), api.clone());
|
||||||
tokio::spawn(async move {
|
|
||||||
let events = api.accounts.read().await.get_event_emitter();
|
|
||||||
while let Some(event) = events.recv().await {
|
|
||||||
let event = event_to_json_rpc_notification(event);
|
|
||||||
client.send_notification("event", Some(event)).await.ok();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
handle_ws_rpc(ws, out_receiver, session).await
|
handle_ws_rpc(ws, out_receiver, session).await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import * as T from "../generated/types.js";
|
import * as T from "../generated/types.js";
|
||||||
import * as RPC from "../generated/jsonrpc.js";
|
import * as RPC from "../generated/jsonrpc.js";
|
||||||
import { RawClient } from "../generated/client.js";
|
import { RawClient } from "../generated/client.js";
|
||||||
import { Event } from "../generated/events.js";
|
|
||||||
import { WebsocketTransport, BaseTransport, Request } from "yerpc";
|
import { WebsocketTransport, BaseTransport, Request } from "yerpc";
|
||||||
import { TinyEmitter } from "@deltachat/tiny-emitter";
|
import { TinyEmitter } from "@deltachat/tiny-emitter";
|
||||||
|
|
||||||
@@ -36,27 +35,30 @@ export class BaseDeltaChat<
|
|||||||
rpc: RawClient;
|
rpc: RawClient;
|
||||||
account?: T.Account;
|
account?: T.Account;
|
||||||
private contextEmitters: { [key: number]: TinyEmitter<ContextEvents> } = {};
|
private contextEmitters: { [key: number]: TinyEmitter<ContextEvents> } = {};
|
||||||
|
|
||||||
|
private eventTask: Promise<void>;
|
||||||
|
|
||||||
constructor(public transport: Transport) {
|
constructor(public transport: Transport) {
|
||||||
super();
|
super();
|
||||||
this.rpc = new RawClient(this.transport);
|
this.rpc = new RawClient(this.transport);
|
||||||
this.transport.on("request", (request: Request) => {
|
this.eventTask = this.eventLoop();
|
||||||
const method = request.method;
|
}
|
||||||
if (method === "event") {
|
|
||||||
const event = request.params! as DCWireEvent<Event>;
|
|
||||||
//@ts-ignore
|
|
||||||
this.emit(event.event.type, event.contextId, event.event as any);
|
|
||||||
this.emit("ALL", event.contextId, event.event as any);
|
|
||||||
|
|
||||||
if (this.contextEmitters[event.contextId]) {
|
async eventLoop(): Promise<void> {
|
||||||
this.contextEmitters[event.contextId].emit(
|
while (true) {
|
||||||
|
const event = await this.rpc.getNextEvent();
|
||||||
|
this.emit(event.event.type, event.context_id, event.event as any);
|
||||||
|
this.emit("ALL", event.context_id, event.event as any);
|
||||||
|
|
||||||
|
if (this.contextEmitters[event.context_id]) {
|
||||||
|
this.contextEmitters[event.context_id].emit(
|
||||||
event.event.type,
|
event.event.type,
|
||||||
//@ts-ignore
|
//@ts-ignore
|
||||||
event.event as any
|
event.event as any
|
||||||
);
|
);
|
||||||
this.contextEmitters[event.contextId].emit("ALL", event.event);
|
this.contextEmitters[event.context_id].emit("ALL", event.event as any);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async listAccounts(): Promise<T.Account[]> {
|
async listAccounts(): Promise<T.Account[]> {
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
export * as RPC from "../generated/jsonrpc.js";
|
export * as RPC from "../generated/jsonrpc.js";
|
||||||
export * as T from "../generated/types.js";
|
export * as T from "../generated/types.js";
|
||||||
export * from "../generated/events.js";
|
|
||||||
export { RawClient } from "../generated/client.js";
|
export { RawClient } from "../generated/client.js";
|
||||||
export * from "./client.js";
|
export * from "./client.js";
|
||||||
export * as yerpc from "yerpc";
|
export * as yerpc from "yerpc";
|
||||||
|
|||||||
@@ -23,7 +23,9 @@ class Rpc:
|
|||||||
self.event_queues: Dict[int, asyncio.Queue]
|
self.event_queues: Dict[int, asyncio.Queue]
|
||||||
# Map from request ID to `asyncio.Future` returning the response.
|
# Map from request ID to `asyncio.Future` returning the response.
|
||||||
self.request_events: Dict[int, asyncio.Future]
|
self.request_events: Dict[int, asyncio.Future]
|
||||||
|
self.closing: bool
|
||||||
self.reader_task: asyncio.Task
|
self.reader_task: asyncio.Task
|
||||||
|
self.events_task: asyncio.Task
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
self.process = await asyncio.create_subprocess_exec(
|
self.process = await asyncio.create_subprocess_exec(
|
||||||
@@ -35,10 +37,15 @@ class Rpc:
|
|||||||
self.id = 0
|
self.id = 0
|
||||||
self.event_queues = {}
|
self.event_queues = {}
|
||||||
self.request_events = {}
|
self.request_events = {}
|
||||||
|
self.closing = False
|
||||||
self.reader_task = asyncio.create_task(self.reader_loop())
|
self.reader_task = asyncio.create_task(self.reader_loop())
|
||||||
|
self.events_task = asyncio.create_task(self.events_loop())
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
"""Terminate RPC server process and wait until the reader loop finishes."""
|
"""Terminate RPC server process and wait until the reader loop finishes."""
|
||||||
|
self.closing = True
|
||||||
|
await self.stop_io_for_all_accounts()
|
||||||
|
await self.events_task
|
||||||
self.process.terminate()
|
self.process.terminate()
|
||||||
await self.reader_task
|
await self.reader_task
|
||||||
|
|
||||||
@@ -58,21 +65,28 @@ class Rpc:
|
|||||||
if "id" in response:
|
if "id" in response:
|
||||||
fut = self.request_events.pop(response["id"])
|
fut = self.request_events.pop(response["id"])
|
||||||
fut.set_result(response)
|
fut.set_result(response)
|
||||||
elif response["method"] == "event":
|
|
||||||
# An event notification.
|
|
||||||
params = response["params"]
|
|
||||||
account_id = params["contextId"]
|
|
||||||
if account_id not in self.event_queues:
|
|
||||||
self.event_queues[account_id] = asyncio.Queue()
|
|
||||||
await self.event_queues[account_id].put(params["event"])
|
|
||||||
else:
|
else:
|
||||||
print(response)
|
print(response)
|
||||||
|
|
||||||
|
async def get_queue(self, account_id: int) -> asyncio.Queue:
|
||||||
|
if account_id not in self.event_queues:
|
||||||
|
self.event_queues[account_id] = asyncio.Queue()
|
||||||
|
return self.event_queues[account_id]
|
||||||
|
|
||||||
|
async def events_loop(self) -> None:
|
||||||
|
"""Requests new events and distributes them between queues."""
|
||||||
|
while True:
|
||||||
|
if self.closing:
|
||||||
|
return
|
||||||
|
event = await self.get_next_event()
|
||||||
|
account_id = event["context_id"]
|
||||||
|
queue = await self.get_queue(account_id)
|
||||||
|
await queue.put(event["event"])
|
||||||
|
|
||||||
async def wait_for_event(self, account_id: int) -> Optional[dict]:
|
async def wait_for_event(self, account_id: int) -> Optional[dict]:
|
||||||
"""Waits for the next event from the given account and returns it."""
|
"""Waits for the next event from the given account and returns it."""
|
||||||
if account_id in self.event_queues:
|
queue = await self.get_queue(account_id)
|
||||||
return await self.event_queues[account_id].get()
|
return await queue.get()
|
||||||
return None
|
|
||||||
|
|
||||||
def __getattr__(self, attr: str):
|
def __getattr__(self, attr: str):
|
||||||
async def method(*args, **kwargs) -> Any:
|
async def method(*args, **kwargs) -> Any:
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use anyhow::{anyhow, Context as _, Result};
|
use anyhow::{anyhow, Context as _, Result};
|
||||||
use deltachat::constants::DC_VERSION_STR;
|
use deltachat::constants::DC_VERSION_STR;
|
||||||
use deltachat_jsonrpc::api::events::event_to_json_rpc_notification;
|
|
||||||
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
use deltachat_jsonrpc::api::{Accounts, CommandApi};
|
||||||
use futures_lite::stream::StreamExt;
|
use futures_lite::stream::StreamExt;
|
||||||
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
use tokio::io::{self, AsyncBufReadExt, BufReader};
|
||||||
@@ -58,7 +57,6 @@ async fn main_impl() -> Result<()> {
|
|||||||
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string());
|
||||||
log::info!("Starting with accounts directory `{}`.", path);
|
log::info!("Starting with accounts directory `{}`.", path);
|
||||||
let accounts = Accounts::new(PathBuf::from(&path)).await?;
|
let accounts = Accounts::new(PathBuf::from(&path)).await?;
|
||||||
let events = accounts.get_event_emitter();
|
|
||||||
|
|
||||||
log::info!("Creating JSON-RPC API.");
|
log::info!("Creating JSON-RPC API.");
|
||||||
let accounts = Arc::new(RwLock::new(accounts));
|
let accounts = Arc::new(RwLock::new(accounts));
|
||||||
@@ -68,21 +66,6 @@ async fn main_impl() -> Result<()> {
|
|||||||
let session = RpcSession::new(client.clone(), state.clone());
|
let session = RpcSession::new(client.clone(), state.clone());
|
||||||
let main_cancel = CancellationToken::new();
|
let main_cancel = CancellationToken::new();
|
||||||
|
|
||||||
// Events task converts core events to JSON-RPC notifications.
|
|
||||||
let cancel = main_cancel.clone();
|
|
||||||
let events_task: JoinHandle<Result<()>> = tokio::spawn(async move {
|
|
||||||
let _cancel_guard = cancel.clone().drop_guard();
|
|
||||||
let mut r = Ok(());
|
|
||||||
while let Some(event) = events.recv().await {
|
|
||||||
if r.is_err() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let event = event_to_json_rpc_notification(event);
|
|
||||||
r = client.send_notification("event", Some(event)).await;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
// Send task prints JSON responses to stdout.
|
// Send task prints JSON responses to stdout.
|
||||||
let cancel = main_cancel.clone();
|
let cancel = main_cancel.clone();
|
||||||
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
let send_task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
|
||||||
@@ -148,16 +131,13 @@ async fn main_impl() -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
// See "Thread safety" section in deltachat-ffi/deltachat.h for explanation.
|
|
||||||
// NB: Events are drained by events_task.
|
|
||||||
main_cancel.cancelled().await;
|
main_cancel.cancelled().await;
|
||||||
accounts.read().await.stop_io().await;
|
accounts.read().await.stop_io().await;
|
||||||
drop(accounts);
|
drop(accounts);
|
||||||
drop(state);
|
drop(state);
|
||||||
let (r0, r1, r2, r3) = tokio::join!(events_task, send_task, sigterm_task, recv_task);
|
send_task.await??;
|
||||||
for r in [r0, r1, r2, r3] {
|
sigterm_task.await??;
|
||||||
r??;
|
recv_task.await??;
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user