implement plugin command execution
- change .proto file for plugins - add global usage guide for all commands - handle commands not included in the built-in set
This commit is contained in:
@@ -2,21 +2,15 @@ mod stdio;
|
||||
|
||||
use anyhow::{Context as _, Result as AnyhowResult, bail};
|
||||
use async_trait::async_trait;
|
||||
use prost::{DecodeError, Message};
|
||||
use prost::DecodeError;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
error::Error,
|
||||
fmt::{Debug, Display},
|
||||
ops::DerefMut,
|
||||
process::Stdio,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use tokio::{
|
||||
io::{self, AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader},
|
||||
process::{Child, ChildStdout},
|
||||
sync::{Mutex, oneshot},
|
||||
};
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
|
||||
use crate::{BotContext, paths::plugins_path, proto};
|
||||
|
||||
@@ -42,6 +36,7 @@ pub(crate) struct LoadedPlugin {
|
||||
pub(crate) enum PluginRequestType {
|
||||
Initialize,
|
||||
CommandList,
|
||||
Execute,
|
||||
}
|
||||
|
||||
impl From<PluginRequestType> for i32 {
|
||||
@@ -49,6 +44,7 @@ impl From<PluginRequestType> for i32 {
|
||||
match value {
|
||||
PluginRequestType::Initialize => 1,
|
||||
PluginRequestType::CommandList => 2,
|
||||
PluginRequestType::Execute => 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -69,6 +65,16 @@ pub(crate) trait PluginConnection: Send + Sync {
|
||||
async fn request_plugin_command_list(
|
||||
&self,
|
||||
) -> Result<proto::PluginCommandListResponse, PluginConnectionError>;
|
||||
|
||||
async fn execute_command(
|
||||
self: Arc<Self>,
|
||||
command_id: String,
|
||||
issuer_id: String,
|
||||
argv: Vec<String>,
|
||||
) -> Result<
|
||||
mpsc::Receiver<Result<proto::CommandReply, PluginConnectionError>>,
|
||||
PluginConnectionError,
|
||||
>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -111,6 +117,7 @@ impl Error for PluginConnectionError {
|
||||
pub(crate) async fn try_load_plugin(
|
||||
ctx: Arc<Mutex<BotContext>>,
|
||||
unique_name: String,
|
||||
config_json: String,
|
||||
) -> AnyhowResult<()> {
|
||||
let plugin_dir = plugins_path().join(&unique_name);
|
||||
if ctx.lock().await.plugins.contains_key(&unique_name) {
|
||||
@@ -119,6 +126,7 @@ pub(crate) async fn try_load_plugin(
|
||||
if !std::fs::metadata(&plugin_dir)?.is_dir() {
|
||||
bail!("Plugin directory doesn't exist");
|
||||
}
|
||||
log::debug!("Loading plugin with JSON config: {}", &config_json);
|
||||
let plugin_executable_path = plugin_dir.join("plugin_run");
|
||||
log::debug!("Starting plugin executable {:?}", &plugin_executable_path);
|
||||
let mut cmd = tokio::process::Command::new(plugin_executable_path);
|
||||
@@ -126,11 +134,11 @@ pub(crate) async fn try_load_plugin(
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::inherit());
|
||||
cmd.current_dir(&plugin_dir);
|
||||
|
||||
// TODO добавить какие-нибудь перемнные среды
|
||||
cmd.env("DCRCBOT_PLUGIN_TRANSPORT", "stdio");
|
||||
|
||||
let plugin_process = cmd.spawn().context("Failed to start the plugin")?;
|
||||
let plugin = stdio::initialize_stdio_plugin(plugin_process, unique_name.clone()).await?;
|
||||
let plugin =
|
||||
stdio::initialize_stdio_plugin(plugin_process, unique_name.clone(), config_json).await?;
|
||||
|
||||
let mut ctx_lock = ctx.lock().await;
|
||||
for cmd in plugin.lock().await.commands.iter().cloned() {
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
use std::{collections::HashMap, error::Error, ops::DerefMut, sync::Arc, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result as AnyhowResult, bail};
|
||||
use anyhow::Result as AnyhowResult;
|
||||
use async_trait::async_trait;
|
||||
use prost::Message;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader},
|
||||
process::{Child, ChildStdout},
|
||||
sync::{Mutex, oneshot},
|
||||
time::error::Elapsed,
|
||||
sync::{Mutex, mpsc, oneshot},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -20,6 +19,7 @@ use crate::{
|
||||
pub(super) async fn initialize_stdio_plugin(
|
||||
process: Child,
|
||||
unique_name: String,
|
||||
config_json: String,
|
||||
) -> AnyhowResult<Arc<Mutex<LoadedPlugin>>> {
|
||||
let plugin = Arc::new(Mutex::new(LoadedPlugin::default()));
|
||||
log::info!("Connecting to plugin {} using standard I/O", &unique_name);
|
||||
@@ -27,7 +27,7 @@ pub(super) async fn initialize_stdio_plugin(
|
||||
let connection = Arc::new(StdioPluginConnection::new(Arc::clone(&plugin), process));
|
||||
Arc::clone(&connection).run_stdio_loops();
|
||||
|
||||
let plugin_info = connection.initialize_plugin(String::new()).await?;
|
||||
let plugin_info = connection.initialize_plugin(config_json).await?;
|
||||
log::debug!("received plugin identification: {:?}", plugin_info);
|
||||
let mut plugin_lock = plugin.lock().await;
|
||||
plugin_lock.name = plugin_info.name;
|
||||
@@ -69,14 +69,14 @@ struct StdioPluginConnection {
|
||||
impl StdioPluginConnection {
|
||||
pub fn new(plugin: Arc<Mutex<LoadedPlugin>>, mut process: Child) -> StdioPluginConnection {
|
||||
let stdout = process.stdout.take().unwrap();
|
||||
let conn = StdioPluginConnection {
|
||||
|
||||
StdioPluginConnection {
|
||||
plugin,
|
||||
process: Mutex::new(process),
|
||||
buffered_stdout: Mutex::new(BufReader::new(stdout)),
|
||||
next_request_id: Mutex::new(0),
|
||||
pending_requests: Mutex::new(HashMap::new()),
|
||||
};
|
||||
conn
|
||||
}
|
||||
}
|
||||
|
||||
fn run_stdio_loops(self: Arc<Self>) {
|
||||
@@ -297,4 +297,105 @@ impl PluginConnection for StdioPluginConnection {
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_command(
|
||||
self: Arc<Self>,
|
||||
command_id: String,
|
||||
issuer_id: String,
|
||||
argv: Vec<String>,
|
||||
) -> Result<
|
||||
mpsc::Receiver<Result<proto::CommandReply, PluginConnectionError>>,
|
||||
PluginConnectionError,
|
||||
> {
|
||||
let request_id = {
|
||||
let mut r = self.next_request_id.lock().await;
|
||||
let id = *r;
|
||||
*r += 1;
|
||||
id
|
||||
};
|
||||
let request = proto::Request {
|
||||
request_id,
|
||||
req: Some(proto::request::Req::ExecuteReq(
|
||||
proto::PluginExecuteRequest {
|
||||
command_id,
|
||||
issuer_id,
|
||||
arg_vector: argv,
|
||||
},
|
||||
)),
|
||||
}
|
||||
.encode_length_delimited_to_vec();
|
||||
|
||||
if let Err(e) = self
|
||||
.process
|
||||
.lock()
|
||||
.await
|
||||
.stdin
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.write_all(&request)
|
||||
.await
|
||||
{
|
||||
return Err(PluginConnectionError::SendRequest(
|
||||
self.plugin.lock().await.plugin_id.clone(),
|
||||
PluginRequestType::Execute,
|
||||
Box::new(e),
|
||||
));
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let response = match self
|
||||
.await_response_to(request_id, Duration::from_secs(600))
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
if let Err(e) = tx
|
||||
.send(Err(PluginConnectionError::ReadResponse(
|
||||
self.plugin.lock().await.plugin_id.clone(),
|
||||
PluginRequestType::Execute,
|
||||
e,
|
||||
)))
|
||||
.await
|
||||
{
|
||||
log::error!("Cannot send error notification to another task: {e}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match response.res {
|
||||
Some(proto::response::Res::CmdReply(reply)) => {
|
||||
let end = match reply.reply {
|
||||
Some(proto::command_reply::Reply::End(_)) => true,
|
||||
_ => false,
|
||||
};
|
||||
if let Err(e) = tx.send(Ok(reply)).await {
|
||||
log::error!("Cannot send command reply to another task: {e}");
|
||||
}
|
||||
if end {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if let Err(e) = tx
|
||||
.send(Err(PluginConnectionError::DecodeResponse(
|
||||
self.plugin.lock().await.plugin_id.clone(),
|
||||
PluginRequestType::Execute,
|
||||
None,
|
||||
)))
|
||||
.await
|
||||
{
|
||||
log::error!("Cannot send error notification to another task: {e}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(rx)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user