Changes for tokio compat, upgrade to yerpc 0.3

This also changes the webserver binary to use axum in place of tide.
This commit is contained in:
Franz Heinzmann (Frando)
2022-06-29 17:18:51 +02:00
committed by Simon Laux
parent 659e48bd3f
commit 97e0e0137a
6 changed files with 415 additions and 58 deletions

View File

@@ -1,5 +1,4 @@
use anyhow::{anyhow, bail, Context, Result};
use async_std::sync::{Arc, RwLock};
use deltachat::{
chat::{get_chat_msgs, ChatId},
chatlist::Chatlist,
@@ -10,7 +9,9 @@ use deltachat::{
provider::get_provider_info,
};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::RwLock;
use yerpc::rpc;
pub use deltachat::accounts::Accounts;

View File

@@ -1,18 +1,16 @@
pub mod api;
pub use api::events;
pub use yerpc;
#[cfg(test)]
mod tests {
use super::api::{Accounts, CommandApi};
use async_channel::unbounded;
use async_std::task;
use futures::StreamExt;
use tempfile::TempDir;
use yerpc::{MessageHandle, RpcHandle};
use yerpc::{RpcClient, RpcSession};
#[async_std::test]
#[tokio::test(flavor = "multi_thread")]
async fn basic_json_rpc_functionality() -> anyhow::Result<()> {
// println!("{}", "");
let tmp_dir = TempDir::new().unwrap().path().into();
@@ -23,10 +21,10 @@ mod tests {
let (sender, mut receiver) = unbounded::<String>();
let (request_handle, mut rx) = RpcHandle::new();
let (client, mut rx) = RpcClient::new();
let session = cmd_api;
let handle = MessageHandle::new(request_handle, session);
task::spawn({
let handle = RpcSession::new(client, session);
tokio::spawn({
async move {
while let Some(message) = rx.next().await {
let message = serde_json::to_string(&message)?;
@@ -41,7 +39,7 @@ mod tests {
{
let request = r#"{"jsonrpc":"2.0","method":"add_account","params":[],"id":1}"#;
let response = r#"{"jsonrpc":"2.0","id":1,"result":1}"#;
handle.handle_message(request).await;
handle.handle_incoming(request).await;
let result = receiver.next().await;
println!("{:?}", result);
assert_eq!(result, Some(response.to_owned()));
@@ -49,7 +47,7 @@ mod tests {
{
let request = r#"{"jsonrpc":"2.0","method":"get_all_account_ids","params":[],"id":2}"#;
let response = r#"{"jsonrpc":"2.0","id":2,"result":[1]}"#;
handle.handle_message(request).await;
handle.handle_incoming(request).await;
let result = receiver.next().await;
println!("{:?}", result);
assert_eq!(result, Some(response.to_owned()));

View File

@@ -1,44 +1,43 @@
use async_std::path::PathBuf;
use async_std::task;
use tide::Request;
use yerpc::RpcHandle;
use yerpc_tide::yerpc_handler;
use axum::{extract::ws::WebSocketUpgrade, response::Response, routing::get, Extension, Router};
use std::net::SocketAddr;
use std::path::PathBuf;
use yerpc::axum::handle_ws_rpc;
use yerpc::{RpcClient, RpcSession};
mod api;
use api::events::event_to_json_rpc_notification;
use api::{Accounts, CommandApi};
#[async_std::main]
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
env_logger::init();
log::info!("Starting");
let accounts = Accounts::new(PathBuf::from("./accounts")).await.unwrap();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let path = std::env::var("DC_ACCOUNTS_PATH").unwrap_or_else(|_| "./accounts".to_string());
log::info!("Starting with accounts directory `{path}`.");
let accounts = Accounts::new(PathBuf::from(&path)).await.unwrap();
let state = CommandApi::new(accounts);
let mut app = tide::with_state(state.clone());
app.at("/ws").get(yerpc_handler(request_handler));
let app = Router::new()
.route("/ws", get(handler))
.layer(Extension(state.clone()));
let addr = SocketAddr::from(([127, 0, 0, 1], 20808));
state.accounts.read().await.start_io().await;
app.listen("127.0.0.1:20808").await?;
log::info!("JSON-RPC WebSocket server listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
Ok(())
}
async fn request_handler(
request: Request<CommandApi>,
rpc: RpcHandle,
) -> anyhow::Result<CommandApi> {
let state = request.state().clone();
task::spawn(event_loop(state.clone(), rpc));
Ok(state)
}
async fn event_loop(state: CommandApi, rpc: RpcHandle) -> anyhow::Result<()> {
let events = state.accounts.read().await.get_event_emitter().await;
while let Some(event) = events.recv().await {
// log::debug!("event {:?}", event);
let event = event_to_json_rpc_notification(event);
rpc.notify("event", Some(event)).await?;
}
Ok(())
async fn handler(ws: WebSocketUpgrade, Extension(api): Extension<CommandApi>) -> Response {
let (client, out_receiver) = RpcClient::new();
let session = RpcSession::new(client.clone(), api.clone());
tokio::spawn(async move {
let events = api.accounts.read().await.get_event_emitter().await;
while let Some(event) = events.recv().await {
let event = event_to_json_rpc_notification(event);
client.send_notification("event", Some(event)).await.ok();
}
});
handle_ws_rpc(ws, out_receiver, session).await
}