From b3239823d04f33e49efcae61a28daab5863e89a0 Mon Sep 17 00:00:00 2001 From: Slavasil Date: Sun, 10 May 2026 23:00:33 +0300 Subject: [PATCH] implement plugin command execution - change .proto file for plugins - add global usage guide for all commands - handle commands not included in the built-in set --- protobuf/plugin.proto | 11 +- src/commands/args.rs | 4 +- src/commands/mod.rs | 33 +++++- src/{config.rs => config/mod.rs} | 11 +- src/config/util.rs | 84 ++++++++++++++ src/main.rs | 181 +++++++++++++++++++++++++++++-- src/plugin/mod.rs | 30 +++-- src/plugin/stdio.rs | 115 ++++++++++++++++++-- src/ssh.rs | 2 +- 9 files changed, 431 insertions(+), 40 deletions(-) rename src/{config.rs => config/mod.rs} (91%) create mode 100644 src/config/util.rs diff --git a/protobuf/plugin.proto b/protobuf/plugin.proto index 34033c8..b891ae7 100644 --- a/protobuf/plugin.proto +++ b/protobuf/plugin.proto @@ -18,6 +18,7 @@ message PluginCommandListRequest {} message PluginExecuteRequest { string command_id = 1; + string issuer_id = 2; repeated string arg_vector = 5; } @@ -26,7 +27,7 @@ message Response { oneof res { PluginInitializeResponse initialize_res = 10; PluginCommandListResponse command_list_res = 11; - PluginExecuteResponse execute_res = 12; + CommandReply cmd_reply = 12; } } @@ -48,6 +49,10 @@ message CommandSpec { string description = 4; } -message PluginExecuteResponse { - +message CommandReply { + bool edit = 1; + oneof reply { + string text = 2; + bool end = 8; + } } diff --git a/src/commands/args.rs b/src/commands/args.rs index 0fa7f94..9a04a15 100644 --- a/src/commands/args.rs +++ b/src/commands/args.rs @@ -2,9 +2,9 @@ pub(super) fn append_port_if_needed(input: &str, default_port: u16) -> String { match input.rsplit_once(':') { Some((_, port)) => match port.parse::() { Ok(_) => input.to_owned(), - Err(_) => format!("{}:{}", input.to_owned(), default_port.to_string()), + Err(_) => format!("{}:{}", input.to_owned(), default_port), }, - None => format!("{}:{}", input.to_owned(), default_port.to_string()), + None => format!("{}:{}", input.to_owned(), default_port), } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 0c816bc..9d1b2f2 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,6 +1,6 @@ pub(crate) mod args; -use std::{fmt::Debug, fs::File, io::Read, ops::Add, path::PathBuf, sync::Arc, time::Duration}; +use std::{fs::File, io::Read, ops::Add, path::PathBuf, sync::Arc, time::Duration}; use crate::{commands::args::append_port_if_needed, ssh}; use anyhow::{Context as _, Result as AnyhowResult}; @@ -13,15 +13,15 @@ use deltachat::{ }; use eui48::MacAddress; use russh::{ - client::{AuthResult, Handle}, + client::AuthResult, keys::PrivateKeyWithHashAlg, }; use tokio::{ sync::Mutex, - time::{Instant, error::Elapsed, timeout, timeout_at}, + time::{Instant, timeout, timeout_at}, }; -use crate::{AUTH_REQUIRED, BotContext, config::BotConfig, data_path, ssh::ClientHandler}; +use crate::{AUTH_REQUIRED, BotContext, config::BotConfig, data_path}; pub async fn echo(dchat_ctx: Arc>, msg: Message) -> AnyhowResult<()> { let chat_id = msg.get_chat_id(); @@ -30,6 +30,31 @@ pub async fn echo(dchat_ctx: Arc>, msg: Message) -> AnyhowResult< Ok(()) } +pub async fn global_usage(ctx: Arc>) -> String { + let ctx_lock = ctx.lock().await; + let config = &ctx_lock.config; + let mut usage = String::from("DeltaChat Remote Control Bot\n\nCommands:\n"); + usage += "/auth - Authenticate (prove you are the bot owner or a trusted person)\n"; + usage += &wol_usage(config); + usage += " - send Wake-on-LAN magic packet to a pre-configured (or an arbitrary) machine in the local network\n"; + usage += &ssh_unlock_disk_usage(config); + usage += " - log into a pre-configured machine via SSH (as root) and send the password to unlock a LUKS-encrypted root partition\n"; + usage += &ssh_exec_usage(config); + usage += " - log into a machine via SSH and execute one command. Waits until the process finishes or forcefully exits after 20 seconds\n"; + + usage += "\nCommands from plugins:\n"; + for cmd in ctx_lock.plugin_commands.values() { + usage += &format!( + "{} (from {}) - {}\n", + &cmd.usage, &cmd.plugin_id, &cmd.description + ); + } + + usage += "\nDeltaChat Remote Control Bot by slavasil, 2026\n"; + + usage +} + pub async fn auth_command( dchat_ctx: Arc>, ctx: Arc>, diff --git a/src/config.rs b/src/config/mod.rs similarity index 91% rename from src/config.rs rename to src/config/mod.rs index ec5427b..5e63962 100644 --- a/src/config.rs +++ b/src/config/mod.rs @@ -1,5 +1,8 @@ +pub mod util; + use eui48::MacAddress; use serde::Deserialize; +use serde_yaml::{Mapping, Value}; use std::{ collections::HashMap, fs, io, @@ -49,17 +52,23 @@ pub struct BotDeltaChatConfig { pub avatar: Option, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct PluginConfig { pub name: String, #[serde(default = "default_plugin_enabled")] pub enabled: bool, + #[serde(default = "default_plugin_config")] + pub config: Value, } fn default_plugin_enabled() -> bool { true } +fn default_plugin_config() -> Value { + Value::Mapping(Mapping::new()) +} + #[derive(Debug)] pub enum ConfigError { Io(io::Error), diff --git a/src/config/util.rs b/src/config/util.rs new file mode 100644 index 0000000..0dc60d3 --- /dev/null +++ b/src/config/util.rs @@ -0,0 +1,84 @@ +use serde_yaml::Value; + +pub(crate) fn yaml_to_json(yaml: &Value) -> String { + let mut json = String::new(); + write_yaml_to_json(yaml, &mut json); + json +} + +fn write_yaml_to_json(yaml: &Value, out: &mut String) { + match yaml { + Value::Null => { + out.push_str("null"); + } + Value::Bool(b) => { + out.push_str(if *b { "true" } else { "false" }); + } + Value::Number(n) => { + out.push_str(&n.to_string()); + } + Value::String(s) => { + out.push('"'); + for c in s.encode_utf16() { + match c { + 0x005C | 0x0022 => { + out.push('\\'); + out.push(char::from_u32(c as u32).unwrap()); + } + 0x0008 => { + out.push('\\'); + out.push('b'); + } + 0x0009 => { + out.push('\\'); + out.push('t'); + } + 0x000a => { + out.push('\\'); + out.push('n'); + } + 0x000C => { + out.push('\\'); + out.push('f'); + } + 0x0000..=0x001F | 0x0080..=0xFFFF => { + out.push_str(&format!("\\u{c:04x}")); + } + _ => { + out.push(char::from_u32(c as u32).unwrap()); + } + } + } + out.push_str(s); // TODO escape + out.push('"'); + } + Value::Tagged(_) => {} + Value::Sequence(list) => { + out.push('['); + if !list.is_empty() { + write_yaml_to_json(&list[0], out); + for i in 1..list.len() { + out.push(','); + write_yaml_to_json(&list[i], out); + } + } + out.push(']'); + } + Value::Mapping(map) => { + out.push('{'); + let mut iter = map.iter(); + if let Some(kv) = iter.next() { + write_yaml_to_json(&Value::String(yaml_to_json(kv.0)), out); + out.push(':'); + write_yaml_to_json(kv.1, out); + } + for kv in iter { + out.push(','); + write_yaml_to_json(&Value::String(yaml_to_json(kv.0)), out); + out.push(':'); + write_yaml_to_json(kv.1, out); + } + out.push('}'); + } + } +} diff --git a/src/main.rs b/src/main.rs index 9b94b8c..25e3848 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,11 +16,12 @@ mod proto { pub(crate) use deltachat_remotecontrol_bot::plugin::*; } -use anyhow::{Context as _, Result as AnyhowResult}; +use anyhow::{Context as _, Result as AnyhowResult, bail}; use clap::Parser; use config::BotConfig; use deltachat::{ - EventType, Events, chat, + EventType, Events, + chat::{self, ChatId}, config::Config, contact::ContactId, context::Context, @@ -33,6 +34,7 @@ use std::{path::PathBuf, sync::Arc}; use tokio::sync::Mutex; use crate::{ + config::PluginConfig, paths::{data_path, default_config_paths}, plugin::{LoadedPlugin, PluginCommand, try_load_plugin}, }; @@ -148,6 +150,21 @@ async fn run_bot(bot_context: Arc>) -> AnyhowResult<()> { Ok(()) } +async fn send_command_not_found( + dchat_ctx: Arc>, + bot_ctx: Arc>, + chat_id: ChatId, +) -> AnyhowResult<()> { + let dchat_ctx_lock = dchat_ctx.lock().await; + chat::send_text_msg( + &dchat_ctx_lock, + chat_id, + commands::global_usage(bot_ctx).await, + ) + .await?; + Ok(()) +} + async fn handle_message( dchat_ctx: Arc>, bot_ctx: Arc>, @@ -182,7 +199,136 @@ async fn handle_message( "/ssh-exec" => { commands::ssh_exec_command(dchat_ctx, bot_ctx, msg, &args).await?; } - _ => {} + _ => { + let ctx_lock = bot_ctx.lock().await; + let Some(cmd) = ctx_lock.plugin_cmd_aliases.get(&cmd[1..]) else { + drop(ctx_lock); + send_command_not_found(dchat_ctx, bot_ctx, msg.get_chat_id()).await?; + return Ok(()); + }; + + let Some(plugin) = ctx_lock.plugins.get(&cmd.plugin_id) else { + bail!( + "Inconsistency in plugin_cmd_aliases hashmap: command references a plugin that is not loaded" + ); + }; + let (plugin_conn, plugin_name) = { + let lock = plugin.lock().await; + let Some(conn) = lock.connection.clone() else { + bail!("Plugin disconnected"); + }; + let plugin_name = lock.plugin_id.clone(); + log::info!( + "Delegating command /{} to plugin {}", + &cmd.name, + &plugin_name + ); + (conn, plugin_name) + }; + let issuer_id = msg.get_from_id().to_string(); + let cmd_name = cmd.name.clone(); + let chat_id = msg.get_chat_id(); + let dchat_ctx = Arc::clone(&dchat_ctx); + + tokio::spawn(async move { + log::debug!("Started plugin command task"); + match plugin_conn + .execute_command( + cmd_name.clone(), + issuer_id, + commands::args::split_command_line(&text), + ) + .await + { + Ok(mut replies) => { + let mut last_message_id = None; + while let Some(reply) = replies.recv().await { + match reply { + Ok(reply) => { + log::debug!( + "Reply to /{} command from plugin {}: {:?}", + &cmd_name, + &plugin_name, + &reply + ); + if reply.edit { + if let Some(msg_id) = last_message_id { + if let Some(proto::command_reply::Reply::Text(text)) = + reply.reply + { + let dchat_ctx_lock = dchat_ctx.lock().await; + log::debug!( + "Command {} :: /{}: editing last message", + &plugin_name, + &cmd_name + ); + if let Err(e) = chat::send_edit_request( + &dchat_ctx_lock, + msg_id, + text, + ) + .await + { + log::error!("Cannot edit message: {e}"); + continue; + } + } + } else { + log::warn!( + "Plugin {} requested to edit a message but no messages sent yet", + &plugin_name + ); + } + } else { + if let Some(proto::command_reply::Reply::Text(text)) = + reply.reply + { + let dchat_ctx_lock = dchat_ctx.lock().await; + log::debug!( + "Command {} :: /{}: sending message", + &plugin_name, + &cmd_name + ); + let msg_id = match chat::send_text_msg( + &dchat_ctx_lock, + chat_id, + text, + ) + .await + { + Ok(msg_id) => msg_id, + Err(e) => { + log::error!("Cannot send new message: {e}"); + continue; + } + }; + last_message_id = Some(msg_id); + } + } + } + Err(e) => { + log::error!( + "Error while executing {} :: /{}: {}", + &plugin_name, + &cmd_name, + e + ); + break; + } + } + } + } + Err(e) => { + log::error!( + "Plugin command ({} :: /{}) execution failed: {}", + &plugin_name, + &cmd_name, + e + ); + } + } + }); + } } Ok(()) @@ -222,20 +368,33 @@ async fn main() { } }; - let requested_plugins: Vec = config + let requested_plugins: Vec = config .plugins - .iter() + .clone() + .into_iter() .filter(|p| p.enabled) - .map(|p| p.name.clone()) .collect(); let bot_context = Arc::new(Mutex::new(BotContext::new(config))); - if requested_plugins.len() > 0 { - log::info!("Loading plugins ({})", requested_plugins.join(", ")); - for plugin in &requested_plugins { - if let Err(e) = try_load_plugin(Arc::clone(&bot_context), plugin.clone()).await { - log::error!("Failed to load plugin \"{plugin}\": {e}"); + if !requested_plugins.is_empty() { + log::info!( + "Loading plugins ({})", + requested_plugins + .iter() + .map(|p| p.name.as_str()) + .collect::>() + .join(", ") + ); + for plugin in requested_plugins { + if let Err(e) = try_load_plugin( + Arc::clone(&bot_context), + plugin.name.clone(), + config::util::yaml_to_json(&plugin.config), + ) + .await + { + log::error!("Failed to load plugin \"{}\": {e}", &plugin.name); } } } diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs index 317bbde..594fea3 100644 --- a/src/plugin/mod.rs +++ b/src/plugin/mod.rs @@ -2,21 +2,15 @@ mod stdio; use anyhow::{Context as _, Result as AnyhowResult, bail}; use async_trait::async_trait; -use prost::{DecodeError, Message}; +use prost::DecodeError; use std::{ - collections::HashMap, error::Error, fmt::{Debug, Display}, - ops::DerefMut, process::Stdio, sync::Arc, }; -use tokio::{ - io::{self, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}, - process::{Child, ChildStdout}, - sync::{Mutex, oneshot}, -}; +use tokio::sync::{Mutex, mpsc}; use crate::{BotContext, paths::plugins_path, proto}; @@ -42,6 +36,7 @@ pub(crate) struct LoadedPlugin { pub(crate) enum PluginRequestType { Initialize, CommandList, + Execute, } impl From for i32 { @@ -49,6 +44,7 @@ impl From for i32 { match value { PluginRequestType::Initialize => 1, PluginRequestType::CommandList => 2, + PluginRequestType::Execute => 3, } } } @@ -69,6 +65,16 @@ pub(crate) trait PluginConnection: Send + Sync { async fn request_plugin_command_list( &self, ) -> Result; + + async fn execute_command( + self: Arc, + command_id: String, + issuer_id: String, + argv: Vec, + ) -> Result< + mpsc::Receiver>, + PluginConnectionError, + >; } #[derive(Debug)] @@ -111,6 +117,7 @@ impl Error for PluginConnectionError { pub(crate) async fn try_load_plugin( ctx: Arc>, unique_name: String, + config_json: String, ) -> AnyhowResult<()> { let plugin_dir = plugins_path().join(&unique_name); if ctx.lock().await.plugins.contains_key(&unique_name) { @@ -119,6 +126,7 @@ pub(crate) async fn try_load_plugin( if !std::fs::metadata(&plugin_dir)?.is_dir() { bail!("Plugin directory doesn't exist"); } + log::debug!("Loading plugin with JSON config: {}", &config_json); let plugin_executable_path = plugin_dir.join("plugin_run"); log::debug!("Starting plugin executable {:?}", &plugin_executable_path); let mut cmd = tokio::process::Command::new(plugin_executable_path); @@ -126,11 +134,11 @@ pub(crate) async fn try_load_plugin( .stdout(Stdio::piped()) .stderr(Stdio::inherit()); cmd.current_dir(&plugin_dir); - - // TODO добавить какие-нибудь перемнные среды + cmd.env("DCRCBOT_PLUGIN_TRANSPORT", "stdio"); let plugin_process = cmd.spawn().context("Failed to start the plugin")?; - let plugin = stdio::initialize_stdio_plugin(plugin_process, unique_name.clone()).await?; + let plugin = + stdio::initialize_stdio_plugin(plugin_process, unique_name.clone(), config_json).await?; let mut ctx_lock = ctx.lock().await; for cmd in plugin.lock().await.commands.iter().cloned() { diff --git a/src/plugin/stdio.rs b/src/plugin/stdio.rs index f7d94bb..7746fa4 100644 --- a/src/plugin/stdio.rs +++ b/src/plugin/stdio.rs @@ -1,13 +1,12 @@ use std::{collections::HashMap, error::Error, ops::DerefMut, sync::Arc, time::Duration}; -use anyhow::{Context as _, Result as AnyhowResult, bail}; +use anyhow::Result as AnyhowResult; use async_trait::async_trait; use prost::Message; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}, process::{Child, ChildStdout}, - sync::{Mutex, oneshot}, - time::error::Elapsed, + sync::{Mutex, mpsc, oneshot}, }; use crate::{ @@ -20,6 +19,7 @@ use crate::{ pub(super) async fn initialize_stdio_plugin( process: Child, unique_name: String, + config_json: String, ) -> AnyhowResult>> { let plugin = Arc::new(Mutex::new(LoadedPlugin::default())); log::info!("Connecting to plugin {} using standard I/O", &unique_name); @@ -27,7 +27,7 @@ pub(super) async fn initialize_stdio_plugin( let connection = Arc::new(StdioPluginConnection::new(Arc::clone(&plugin), process)); Arc::clone(&connection).run_stdio_loops(); - let plugin_info = connection.initialize_plugin(String::new()).await?; + let plugin_info = connection.initialize_plugin(config_json).await?; log::debug!("received plugin identification: {:?}", plugin_info); let mut plugin_lock = plugin.lock().await; plugin_lock.name = plugin_info.name; @@ -69,14 +69,14 @@ struct StdioPluginConnection { impl StdioPluginConnection { pub fn new(plugin: Arc>, mut process: Child) -> StdioPluginConnection { let stdout = process.stdout.take().unwrap(); - let conn = StdioPluginConnection { + + StdioPluginConnection { plugin, process: Mutex::new(process), buffered_stdout: Mutex::new(BufReader::new(stdout)), next_request_id: Mutex::new(0), pending_requests: Mutex::new(HashMap::new()), - }; - conn + } } fn run_stdio_loops(self: Arc) { @@ -297,4 +297,105 @@ impl PluginConnection for StdioPluginConnection { )), } } + + async fn execute_command( + self: Arc, + command_id: String, + issuer_id: String, + argv: Vec, + ) -> Result< + mpsc::Receiver>, + PluginConnectionError, + > { + let request_id = { + let mut r = self.next_request_id.lock().await; + let id = *r; + *r += 1; + id + }; + let request = proto::Request { + request_id, + req: Some(proto::request::Req::ExecuteReq( + proto::PluginExecuteRequest { + command_id, + issuer_id, + arg_vector: argv, + }, + )), + } + .encode_length_delimited_to_vec(); + + if let Err(e) = self + .process + .lock() + .await + .stdin + .as_mut() + .unwrap() + .write_all(&request) + .await + { + return Err(PluginConnectionError::SendRequest( + self.plugin.lock().await.plugin_id.clone(), + PluginRequestType::Execute, + Box::new(e), + )); + } + + let (tx, rx) = mpsc::channel(4); + + tokio::spawn(async move { + loop { + let response = match self + .await_response_to(request_id, Duration::from_secs(600)) + .await + { + Ok(response) => response, + Err(e) => { + if let Err(e) = tx + .send(Err(PluginConnectionError::ReadResponse( + self.plugin.lock().await.plugin_id.clone(), + PluginRequestType::Execute, + e, + ))) + .await + { + log::error!("Cannot send error notification to another task: {e}"); + } + break; + } + }; + + match response.res { + Some(proto::response::Res::CmdReply(reply)) => { + let end = match reply.reply { + Some(proto::command_reply::Reply::End(_)) => true, + _ => false, + }; + if let Err(e) = tx.send(Ok(reply)).await { + log::error!("Cannot send command reply to another task: {e}"); + } + if end { + break; + } + } + _ => { + if let Err(e) = tx + .send(Err(PluginConnectionError::DecodeResponse( + self.plugin.lock().await.plugin_id.clone(), + PluginRequestType::Execute, + None, + ))) + .await + { + log::error!("Cannot send error notification to another task: {e}"); + } + break; + } + } + } + }); + + Ok(rx) + } } diff --git a/src/ssh.rs b/src/ssh.rs index fc94fd5..459007e 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -14,7 +14,7 @@ impl Handler for ClientHandler { async fn check_server_key( &mut self, - server_public_key: &russh::keys::ssh_key::PublicKey, + _server_public_key: &russh::keys::ssh_key::PublicKey, ) -> Result { Ok(true) }