diff --git a/CHANGELOG.md b/CHANGELOG.md index b08f505d3..253d1b0e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### API-Changes ### Changes +- add JSON-RPC stdio server `deltachat-rpc-server` and use it for JSON-RPC tests #3695 ### Fixes diff --git a/Cargo.lock b/Cargo.lock index 5490cb6b2..364e794e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -986,6 +986,21 @@ dependencies = [ "yerpc", ] +[[package]] +name = "deltachat-rpc-server" +version = "1.98.0" +dependencies = [ + "anyhow", + "deltachat-jsonrpc", + "env_logger 0.9.1", + "futures-lite", + "log", + "serde", + "serde_json", + "tokio", + "yerpc", +] + [[package]] name = "deltachat_derive" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index d6674490d..ec322c03f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,8 @@ tokio = { version = "1", features = ["parking_lot", "rt-multi-thread", "macros"] members = [ "deltachat-ffi", "deltachat_derive", - "deltachat-jsonrpc" + "deltachat-jsonrpc", + "deltachat-rpc-server" ] [[example]] diff --git a/deltachat-jsonrpc/typescript/package.json b/deltachat-jsonrpc/typescript/package.json index b37b3935a..356527270 100644 --- a/deltachat-jsonrpc/typescript/package.json +++ b/deltachat-jsonrpc/typescript/package.json @@ -41,7 +41,7 @@ "prettier:check": "prettier --check **.ts", "prettier:fix": "prettier --write **.ts", "test": "run-s test:prepare test:run-coverage test:report-coverage", - "test:prepare": "cargo build --features webserver --bin deltachat-jsonrpc-server", + "test:prepare": "cargo build --package deltachat-rpc-server --bin deltachat-rpc-server", "test:report-coverage": "node report_api_coverage.mjs", "test:run": "mocha dist/test", "test:run-coverage": "COVERAGE=1 NODE_OPTIONS=--enable-source-maps c8 --include 'dist/*' -r text -r html -r json mocha dist/test" diff --git a/deltachat-jsonrpc/typescript/src/client.ts b/deltachat-jsonrpc/typescript/src/client.ts index 8da65c055..bc3e280d8 100644 --- a/deltachat-jsonrpc/typescript/src/client.ts +++ b/deltachat-jsonrpc/typescript/src/client.ts @@ -92,3 +92,34 @@ export class DeltaChat extends BaseDeltaChat { this.opts = opts; } } + +export class StdioDeltaChat extends BaseDeltaChat { + close() {} + constructor(input: any, output: any) { + const transport = new StdioTransport(input, output); + super(transport); + } +} + +export class StdioTransport extends BaseTransport { + constructor(public input: any, public output: any) { + super(); + + var buffer = ""; + this.output.on("data", (data: any) => { + buffer += data.toString(); + while (buffer.includes("\n")) { + const n = buffer.indexOf("\n"); + const line = buffer.substring(0, n); + const message = JSON.parse(line); + this._onmessage(message); + buffer = buffer.substring(n + 1); + } + }); + } + + _send(message: RPC.Message): void { + const serialized = JSON.stringify(message); + this.input.write(serialized + "\n"); + } +} diff --git a/deltachat-jsonrpc/typescript/test/basic.ts b/deltachat-jsonrpc/typescript/test/basic.ts index b4593305e..2df048d48 100644 --- a/deltachat-jsonrpc/typescript/test/basic.ts +++ b/deltachat-jsonrpc/typescript/test/basic.ts @@ -2,7 +2,7 @@ import { strictEqual } from "assert"; import chai, { assert, expect } from "chai"; import chaiAsPromised from "chai-as-promised"; chai.use(chaiAsPromised); -import { DeltaChat } from "../deltachat.js"; +import { StdioDeltaChat as DeltaChat } from "../deltachat.js"; import { RpcServerHandle, @@ -15,9 +15,7 @@ describe("basic tests", () => { before(async () => { serverHandle = await startServer(); - // make sure server is up by the time we continue - await new Promise((res) => setTimeout(res, 100)); - dc = new DeltaChat(serverHandle.url) + dc = new DeltaChat(serverHandle.stdin, serverHandle.stdout) // dc.on("ALL", (event) => { //console.log("event", event); // }); diff --git a/deltachat-jsonrpc/typescript/test/online.ts b/deltachat-jsonrpc/typescript/test/online.ts index c7f56b35e..4bf7181c9 100644 --- a/deltachat-jsonrpc/typescript/test/online.ts +++ b/deltachat-jsonrpc/typescript/test/online.ts @@ -1,5 +1,5 @@ import { assert, expect } from "chai"; -import { DeltaChat, DcEvent } from "../deltachat.js"; +import { StdioDeltaChat as DeltaChat, DcEvent } from "../deltachat.js"; import { RpcServerHandle, createTempUser, startServer } from "./test_base.js"; const EVENT_TIMEOUT = 20000; @@ -27,7 +27,7 @@ describe("online tests", function () { this.skip(); } serverHandle = await startServer(); - dc = new DeltaChat(serverHandle.url); + dc = new DeltaChat(serverHandle.stdin, serverHandle.stdout); dc.on("ALL", (contextId, { type }) => { if (type !== "Info") console.log(contextId, type); diff --git a/deltachat-jsonrpc/typescript/test/test_base.ts b/deltachat-jsonrpc/typescript/test/test_base.ts index 3d6da6faa..66bc0d72d 100644 --- a/deltachat-jsonrpc/typescript/test/test_base.ts +++ b/deltachat-jsonrpc/typescript/test/test_base.ts @@ -1,39 +1,38 @@ import { tmpdir } from "os"; import { join, resolve } from "path"; import { mkdtemp, rm } from "fs/promises"; -import { existsSync } from "fs"; import { spawn, exec } from "child_process"; import fetch from "node-fetch"; - -export const RPC_SERVER_PORT = 20808; +import { Readable, Writable } from "node:stream"; export type RpcServerHandle = { - url: string, - close: () => Promise -} + stdin: Writable; + stdout: Readable; + close: () => Promise; +}; -export async function startServer(port: number = RPC_SERVER_PORT): Promise { +export async function startServer(): Promise { const tmpDir = await mkdtemp(join(tmpdir(), "deltachat-jsonrpc-test")); - const pathToServerBinary = resolve(join(await getTargetDir(), "debug/deltachat-jsonrpc-server")); - console.log('using server binary: ' + pathToServerBinary); - - if (!existsSync(pathToServerBinary)) { - throw new Error( - "server executable does not exist, you need to build it first" + - "\nserver executable not found at " + - pathToServerBinary - ); - } + const pathToServerBinary = resolve( + join(await getTargetDir(), "debug/deltachat-rpc-server") + ); const server = spawn(pathToServerBinary, { cwd: tmpDir, env: { RUST_LOG: process.env.RUST_LOG || "info", - DC_PORT: '' + port, - RUST_MIN_STACK: "8388608" + RUST_MIN_STACK: "8388608", }, }); + + server.on("error", (err) => { + throw new Error( + "Failed to start server executable " + + pathToServerBinary + + ", make sure you built it first." + ); + }); let shouldClose = false; server.on("exit", () => { @@ -44,12 +43,10 @@ export async function startServer(port: number = RPC_SERVER_PORT): Promise { shouldClose = true; if (!server.kill()) { diff --git a/deltachat-rpc-server/Cargo.toml b/deltachat-rpc-server/Cargo.toml new file mode 100644 index 000000000..a252df2b6 --- /dev/null +++ b/deltachat-rpc-server/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "deltachat-rpc-server" +version = "1.98.0" +description = "DeltaChat JSON-RPC server" +authors = ["Delta Chat Developers (ML) "] +edition = "2021" +readme = "README.md" +license = "MPL-2.0" + +keywords = ["deltachat", "chat", "openpgp", "email", "encryption"] +categories = ["cryptography", "std", "email"] + +[[bin]] +name = "deltachat-rpc-server" + +[dependencies] +deltachat-jsonrpc = { path = "../deltachat-jsonrpc" } + +anyhow = "1" +env_logger = { version = "0.9.1" } +futures-lite = "1.12.0" +log = "0.4" +serde_json = "1.0.85" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.21.2", features = ["io-std"] } +yerpc = { version = "0.3.1", features = ["anyhow_expose"] } diff --git a/deltachat-rpc-server/src/bin/deltachat-rpc-server/main.rs b/deltachat-rpc-server/src/bin/deltachat-rpc-server/main.rs new file mode 100644 index 000000000..82430901d --- /dev/null +++ b/deltachat-rpc-server/src/bin/deltachat-rpc-server/main.rs @@ -0,0 +1,68 @@ +///! Delta Chat core RPC server. +///! +///! It speaks JSON Lines over stdio. +use std::path::PathBuf; + +use anyhow::Result; +use deltachat_jsonrpc::api::events::event_to_json_rpc_notification; +use deltachat_jsonrpc::api::{Accounts, CommandApi}; +use futures_lite::stream::StreamExt; +use tokio::io::{self, AsyncBufReadExt, BufReader}; +use tokio::task::JoinHandle; +use yerpc::{RpcClient, RpcSession}; + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "accounts".to_string()); + log::info!("Starting with accounts directory `{}`.", path); + let accounts = Accounts::new(PathBuf::from(&path)).await?; + let events = accounts.get_event_emitter(); + + log::info!("Creating JSON-RPC API."); + let state = CommandApi::new(accounts); + + let (client, mut out_receiver) = RpcClient::new(); + let session = RpcSession::new(client.clone(), state); + + // Events task converts core events to JSON-RPC notifications. + let events_task: JoinHandle> = tokio::spawn(async move { + while let Some(event) = events.recv().await { + let event = event_to_json_rpc_notification(event); + client.send_notification("event", Some(event)).await?; + } + Ok(()) + }); + + // Send task prints JSON responses to stdout. + let send_task: JoinHandle> = tokio::spawn(async move { + while let Some(message) = out_receiver.next().await { + let message = serde_json::to_string(&message)?; + log::trace!("RPC send {}", message); + println!("{}", message); + } + Ok(()) + }); + + // Receiver task reads JSON requests from stdin. + let recv_task: JoinHandle> = tokio::spawn(async move { + let stdin = io::stdin(); + let mut lines = BufReader::new(stdin).lines(); + while let Some(message) = lines.next_line().await? { + log::trace!("RPC recv {}", message); + session.handle_incoming(&message).await; + } + log::info!("EOF reached on stdin"); + Ok(()) + }); + + // Wait for the end of stdin. + recv_task.await??; + + // Shutdown the server. + send_task.abort(); + events_task.abort(); + + Ok(()) +} diff --git a/scripts/set_core_version.py b/scripts/set_core_version.py index e998fa283..9a46a8278 100755 --- a/scripts/set_core_version.py +++ b/scripts/set_core_version.py @@ -63,8 +63,13 @@ def main(): parser = ArgumentParser(prog="set_core_version") parser.add_argument("newversion") - toml_list = ["Cargo.toml", "deltachat-ffi/Cargo.toml", "deltachat-jsonrpc/Cargo.toml"] json_list = ["package.json", "deltachat-jsonrpc/typescript/package.json"] + toml_list = [ + "Cargo.toml", + "deltachat-ffi/Cargo.toml", + "deltachat-jsonrpc/Cargo.toml", + "deltachat-rpc-server/Cargo.toml", + ] try: opts = parser.parse_args() except SystemExit: