Skip to content

Commit 3208395

Browse files
committed
break change: change message format, add cmd finish message
1 parent 6fc0f70 commit 3208395

File tree

7 files changed

+64
-34
lines changed

7 files changed

+64
-34
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
.idea
2+
release

Makefile

+8-9
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33
name = mproxy
44
ctrl_name = mpublish
55
rust_path = agent
6+
67
release-mac-x86_64:
78
mkdir -p release
89
cd $(rust_path) && cargo build --release --target=x86_64-apple-darwin
910
strip $(rust_path)/target/x86_64-apple-darwin/release/$(name)
1011
otool -L $(rust_path)/target/x86_64-apple-darwin/release/$(name)
1112
cp $(rust_path)/target/x86_64-apple-darwin/release/$(name) ./release/
1213
strip $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name)
13-
otool -L $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name)
14-
cp $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name) ./release/
14+
otool -L $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name)
15+
cp $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name) ./release/
1516

1617
# brew install wget
1718
release-mac-aarch64:
@@ -21,8 +22,8 @@ release-mac-aarch64:
2122
otool -L $(rust_path)/target/aarch64-apple-darwin/release/$(name)
2223
cp $(rust_path)/target/aarch64-apple-darwin/release/$(name) ./release/
2324
strip $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name)
24-
otool -L $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name)
25-
cp $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name) ./release/
25+
otool -L $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name)
26+
cp $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name) ./release/
2627

2728
release-linux-aarch64:
2829
sudo apt-get install -y build-essential
@@ -31,10 +32,8 @@ release-linux-aarch64:
3132
strip $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(name)
3233
cp $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(name) ./release/
3334
cd $(rust_path) && cargo build --release --target=aarch64-unknown-linux-gnu
34-
strip $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name)
35-
cp $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name) ./release/
36-
37-
35+
strip $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name)
36+
cp $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name) ./release/
3837

3938
release-linux:
4039
sudo apt-get install -y build-essential
@@ -43,4 +42,4 @@ release-linux:
4342
strip $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(name)
4443
cp $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(name) ./release
4544
strip $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(ctrl_name)
46-
cp $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(ctrl_name) ./release
45+
cp $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(ctrl_name) ./release

README.md

+13-6
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,33 @@
3232
# Web websocket-mqtt send
3333
# publish topic: cmd/$clientId
3434
{
35-
cmd: "ls",
35+
command: "ls",
3636
requestId: "random_to_track",
37+
t: "Cmd"
3738
#stream: false, # can be empty, default is false. this project now only support false.
3839
}
3940

4041
# mproxy response
4142
# publish topic: cmd/$client/resp
4243
# success response
4344
{
44-
type: "Ok"
45+
t: "D"
4546
data: "abc.txt/nccn.txt",
46-
requestId: "random_to_track",
47+
reqId: "random_to_track",
4748
pid: 39512, #process id
4849
seq: 1 #some may resp more than one time, so set seq to keep order.
4950
}
5051
# failure response
5152
{
52-
type: "Err",
53+
t: "Err",
5354
message: "response data"
54-
requestId: "random_to_track"
55+
reqId: "random_to_track"
56+
}
57+
# finish response
58+
{
59+
t: "Ok",
60+
reqId: "random_to_track",
61+
pid: 39512, #process id
5562
}
5663
```
5764
### 配置文件
@@ -67,7 +74,7 @@ cd shell && ./run_mqtt.sh && cd ../
6774
mprocs
6875

6976
# Check MQTT agent if is OK
70-
cd agent && cargo run --bin mpublish -- --config=mpublish.yml ls -ls
77+
cd agent && cargo run --bin mpublish -- --config=mpublish.yml -- ls -ls
7178
```
7279
Web [Figma UI](https://www.figma.com/design/iyL4dms3B8AWGZS14FCRuf/RMQTT-EXEC?node-id=0%3A1&t=rnIL1LSWwQIXfZdf-1)
7380
## 限制

agent/src/bin/mpublish.rs

+15-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::env;
22
use std::io::IsTerminal;
33
use std::path::PathBuf;
4+
use std::process::exit;
45
use anyhow::bail;
56
use clap::Parser;
67
use rumqttc::v5::{AsyncClient, MqttOptions, Incoming};
@@ -43,6 +44,7 @@ async fn main() -> anyhow::Result<()> {
4344
let topic = config.get_subscribe_command_topic();
4445
client.subscribe(topic, QoS::ExactlyOnce).await?;
4546

47+
let client_ = client.clone();
4648
tokio::spawn(async move {
4749
loop {
4850
let event = eventloop.poll().await;
@@ -51,10 +53,17 @@ async fn main() -> anyhow::Result<()> {
5153
match event {
5254
rumqttc::v5::Event::Incoming(Incoming::Publish(data)) => {
5355
let resp: ResponseMessage = serde_json::from_slice(data.payload.as_ref()).unwrap();
54-
5556
match resp {
56-
ResponseMessage::Ok {data, seq, pid, ..} => println!("[{pid}][{seq}] {data}"),
57-
ResponseMessage::Err {message,..} => eprintln!("{message}"),
57+
ResponseMessage::D {data, seq, pid, ..} => println!("[{pid}][{seq}] {data}"),
58+
ResponseMessage::Err {message,..} => {
59+
eprintln!("{message}");
60+
let _ = client_.disconnect().await;
61+
exit(1);
62+
},
63+
ResponseMessage::Ok {..} => {
64+
let _ = client_.disconnect().await;
65+
exit(0);
66+
}
5867
}
5968
}
6069
_ => ()
@@ -64,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
6473
};
6574
}
6675
});
67-
let command = RequestMessage::Cmd{command: command.to_string(), request_id: "test_request_id".to_string()};
76+
let command = RequestMessage::Cmd{command: command.to_string(), req_id: "test_request_id".to_string()};
6877
let command = serde_json::to_vec(&command).unwrap();
6978
client.publish(config.get_publish_command_topic(), QoS::ExactlyOnce, false, command).await?;
7079
tokio::signal::ctrl_c().await?;
@@ -85,10 +94,10 @@ mod test{
8594
let config = Cli::parse_from([APP_NAME,"--config=config", "ls", "pwd"]);
8695
println!("{config:?}");
8796

88-
let config = Cli::parse_from([APP_NAME, "-c","config.yaml", "ls", "pwd"]);
97+
let config = Cli::parse_from([APP_NAME, "-c","config.yaml", "ls -ls"]);
8998
println!("{config:?}");
9099

91-
let config = Cli::parse_from([APP_NAME, "-c","config.yaml"]);
100+
let config = Cli::parse_from([APP_NAME, "-c","config.yaml", "--", "ls", "-ls"]);
92101
println!("{config:?}");
93102

94103
}

agent/src/handler.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ impl Handler {
2323

2424
async fn run_command(connection:Arc<Connection>, cmd:RequestMessage, publish_topic:Arc<String>) {
2525
match cmd {
26-
RequestMessage::Cmd { command,request_id } => {
26+
RequestMessage::Cmd { command, req_id: request_id } => {
2727
let command_parsed = shellish_parse::parse(&command,false);
2828
let command_parsed = match command_parsed {
2929
Ok(result) => result,
3030
Err(e) => {
31-
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
31+
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {req_id: request_id.clone(), message:format!("{}", e)}).await;
3232
return;
3333
}
3434
};
@@ -46,13 +46,14 @@ impl Handler {
4646
for line in reader.lines().filter_map(|line| line.ok()) {
4747
//TODO: handle publish error
4848
let _ = connection.publish_response(
49-
&publish_topic, ResponseMessage::Ok {request_id: request_id.clone(),data:line, seq: seq, pid: pid}).await;
49+
&publish_topic, ResponseMessage::D { req_id: request_id.clone(),data:line, seq, pid }).await;
5050
seq +=1;
5151
}
5252
}
53+
let _ = connection.publish_response(&publish_topic, ResponseMessage::Ok { req_id: request_id, pid}).await;
5354
}
5455
Err(e) => {
55-
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
56+
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {req_id: request_id.clone(), message:format!("{}", e)}).await;
5657
}
5758
}
5859
}

agent/src/message.rs

+19-6
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,30 @@
11
use serde::{Deserialize, Serialize};
22

33
#[derive(Serialize, Deserialize, Debug, Clone)]
4-
#[serde(tag = "type")]
4+
#[serde(tag = "t", rename_all_fields="camelCase")]
55
pub enum RequestMessage {
66
Cmd {
77
command: String,
8-
request_id: String,
8+
req_id: String,
99
}
1010
}
1111

1212
#[derive(Serialize, Deserialize, Debug, Clone)]
13-
#[serde(tag = "type")]
13+
#[serde(tag = "t", rename_all_fields="camelCase")]
1414
pub enum ResponseMessage {
15-
Ok{ request_id:String, seq:u32, data:String, pid:u32},
16-
Err{ request_id:String, message:String}
17-
}
15+
D { req_id:String, seq:u32, data:String, pid:u32},
16+
Err { req_id:String, message:String },
17+
Ok { req_id: String, pid: u32},
18+
}
19+
20+
#[cfg(test)]
21+
mod test {
22+
use crate::message::ResponseMessage;
23+
24+
#[test]
25+
fn test_one() {
26+
let r = ResponseMessage::Ok {req_id: "1".to_string(), pid:1};
27+
let parse_result = serde_json::to_string(&r);
28+
assert_eq!(parse_result.unwrap(), r#"{"t":"Ok","reqId":"1","pid":1}"#.to_string());
29+
}
30+
}

mprocs.yaml

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ procs:
55
env:
66
RUST_LOG: debug
77
shell: cd agent && cargo run --bin mproxy mproxy.yml
8-
web:
9-
shell: cd web && npm run dev
10-
stop: SIGKILL
8+
# web:
9+
# shell: cd web && npm run dev
10+
# stop: SIGKILL

0 commit comments

Comments
 (0)