Skip to content

Commit 2b72055

Browse files
authoredJun 13, 2019
Update NSQ code (#16)
* Working on a JSON query lang The idea being that you can make a call into toolkit with a query like select .foo from json where .id = "123" So you can easily filter JSON files. This is like JQ, but not intended to be any reformat, or complex filtering. This is supposed to be a simple filter tool. * Refactor all of NSQ * Update musl-builder.Dockerfile * API calls are async, and don't require nightly
1 parent 51d068b commit 2b72055

20 files changed

+2568
-1132
lines changed
 

‎Cargo.lock

+751-352
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+5-1
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,8 @@ kopy-common-lib = { git = "https://github.com/ethankhall/rust-shared.git", rev =
2727
prettytable-rs = "0.8"
2828
termion = "*"
2929
lazy_static = "*"
30-
flate2 = "1.0"
30+
flate2 = "1.0"
31+
atty = "0.2"
32+
pest = "2.1.0"
33+
pest_derive = "2.1.0"
34+
tokio = "0.1"

‎ci/musl-builder.Dockerfile

+1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ RUN chmod +x ~/bin/crom
66
ADD . ./
77
RUN sudo chown -R rust:rust .
88
RUN ~/bin/crom update-version --pre-release release
9+
RUN cargo --version
910

1011
CMD cargo test && cargo build --release

‎src/cli.yaml

+35-8
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,14 @@ subcommands:
8989
settings:
9090
- SubcommandRequiredElseHelp
9191
subcommands:
92-
- status:
93-
about: Shows the status of NSQ Topic
94-
settings:
95-
- ArgRequiredElseHelp
92+
- stats:
93+
about: Shows the stats on NSQ
94+
groups:
95+
- filter:
96+
required: true
97+
args:
98+
- producers
99+
- topics
96100
args:
97101
- count:
98102
help: How many times should I report back?
@@ -104,19 +108,26 @@ subcommands:
104108
long: lookupd-host
105109
takes_value: true
106110
required: true
111+
env: NSQ_LOOKUP_HOST
107112
- nsq_lookup_port:
108113
help: Port to NSQ Lookup
109114
long: lookupd-port
110115
takes_value: true
111116
default_value: "4161"
117+
env: NSQ_LOOKUP_PORT
112118
- delay:
113119
help: Ideally, how long between checks to NSQ, in seconds.
114120
long: delay
115121
takes_value: true
116122
default_value: "1"
117-
- TOPIC:
118-
help: Which topic should be check on
119-
required: true
123+
- producers:
124+
help: Limit to specific producers
125+
long: filter-producers
126+
takes_value: true
127+
multiple: true
128+
- topics:
129+
help: Limit to specific topics
130+
long: filter-topics
120131
takes_value: true
121132
multiple: true
122133
- send:
@@ -165,6 +176,22 @@ subcommands:
165176
settings:
166177
- SubcommandRequiredElseHelp
167178
subcommands:
179+
- sql:
180+
about: Filter new-line delemited JSON using a SQL like expression.
181+
settings:
182+
- ArgRequiredElseHelp
183+
- Hidden
184+
args:
185+
- EXP:
186+
help: SQL expression
187+
required: true
188+
takes_value: true
189+
last: true
190+
- json:
191+
help: Path to file to process
192+
takes_value: true
193+
required: true
194+
long: json
168195
- latest:
169196
about: Filter new-line delemited JSON stream to the newest message
170197
long_about: If a JSON blob has both an ID that's unique, and a timestamp/version field. Filter the stream for the latest ID/version field.
@@ -197,4 +224,4 @@ groups:
197224
args:
198225
- debug
199226
- warn
200-
- quite
227+
- quite

‎src/commands/har/output.rs

+8-18
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::fs::File;
22
use std::io::Write;
33

44
use comrak::{markdown_to_html, ComrakOptions};
5-
use mime::{Mime, APPLICATION_JSON, APPLICATION_OCTET_STREAM, TEXT_JAVASCRIPT};
65
use serde_json;
76

87
use super::model::*;
@@ -128,25 +127,16 @@ impl ToMarkdown for HarFile {
128127
entry.response.content.mime_type.clone()
129128
));
130129
if let Some(text) = entry.response.content.text {
131-
let body = text.replace("\\n", "\n");
132-
let mime = entry
133-
.response
134-
.content
135-
.mime_type
136-
.parse::<Mime>()
137-
.unwrap_or(APPLICATION_OCTET_STREAM);
138-
139130
lines.push("**Body:**".to_string());
140-
if mime == APPLICATION_JSON || mime == TEXT_JAVASCRIPT {
141-
match serde_json::from_str::<serde_json::Value>(&body) {
142-
Ok(json) => {
143-
let body = serde_json::to_string_pretty(&json).unwrap();
144-
lines.push(format!("```\n{}\n```\n", body))
145-
}
146-
Err(_) => lines.push(format!("```\n{}\n```\n", body)),
131+
match serde_json::from_str::<serde_json::Value>(&text) {
132+
Ok(json) => {
133+
let body = serde_json::to_string_pretty(&json).unwrap();
134+
lines.push(format!("```\n{}\n```\n", body))
135+
}
136+
Err(_) => {
137+
let body = text.replace("\\n", "\n");
138+
lines.push(format!("```\n{}\n```\n", body));
147139
}
148-
} else {
149-
lines.push(format!("```\n{}\n```\n", body))
150140
}
151141
}
152142
}

‎src/commands/json/latest.rs

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use std::collections::BTreeMap;
2+
use std::fs::File;
3+
use std::io::prelude::*;
4+
use std::io::{BufRead, BufReader, LineWriter};
5+
use std::path::PathBuf;
6+
7+
use clap::ArgMatches;
8+
use json::JsonValue;
9+
10+
use super::{find_field, parse_path};
11+
use crate::commands::progress::*;
12+
use crate::commands::CliError;
13+
14+
#[derive(Debug)]
15+
struct Record {
16+
id: String,
17+
version: i32,
18+
data: String,
19+
}
20+
21+
#[test]
22+
fn leading_dot_will_be_ignored() {
23+
let split: Vec<String> = parse_path(".abc.123");
24+
25+
assert_eq!(2, split.len());
26+
}
27+
28+
fn build_key(keys: &Vec<Vec<String>>, json_input: &JsonValue) -> Option<String> {
29+
let mut key_list: Vec<String> = Vec::new();
30+
31+
for part in keys {
32+
if let Some(key) = find_field(part, json_input) {
33+
key_list.push(s!(key.as_str().unwrap_or("null")));
34+
}
35+
}
36+
37+
if key_list.is_empty() {
38+
return None;
39+
}
40+
41+
return Some(key_list.join(":"));
42+
}
43+
44+
pub fn do_json_latest_command(args: &ArgMatches) -> Result<(), CliError> {
45+
let output_path = args.value_of("OUTPUT").unwrap();
46+
let output_path = PathBuf::from(output_path);
47+
48+
let id_fields: Vec<Vec<String>> = args
49+
.values_of("id")
50+
.unwrap()
51+
.into_iter()
52+
.map(|x| parse_path(x))
53+
.collect();
54+
55+
let version_path: Vec<String> = parse_path(args.value_of("seq").unwrap());
56+
57+
let file = File::create(output_path).unwrap();
58+
let mut file = LineWriter::new(file);
59+
60+
let pb = ProgressBarHelper::new(ProgressBarType::UnsizedProgressBar(
61+
"{prefix:.bold.dim} {spinner:.green} {wide_msg}",
62+
));
63+
64+
let mut records: BTreeMap<String, Record> = BTreeMap::new();
65+
66+
let input_paths: Vec<String> = args.values_of("INPUT").unwrap().map(|x| s!(x)).collect();
67+
68+
let mut counter = 0;
69+
for input_path in input_paths.into_iter() {
70+
let reader = match crate::commands::file::open_file(&input_path) {
71+
Ok(reader) => BufReader::new(reader),
72+
Err(e) => {
73+
error!("Unable to open {} because {}", input_path, e.to_string());
74+
continue;
75+
}
76+
};
77+
78+
for line in reader.lines() {
79+
match line {
80+
Ok(line) => {
81+
counter += 1;
82+
if counter % 10 == 0 {
83+
pb.set_message(&format!(
84+
"Reading line {}\t Used: {}",
85+
counter,
86+
records.len()
87+
));
88+
}
89+
90+
let json_line = json::parse(&line).unwrap();
91+
92+
let id = match build_key(&id_fields, &json_line) {
93+
Some(value) => value,
94+
None => {
95+
warn!("Skipping `{}` because all id was missing.", &line);
96+
continue;
97+
}
98+
};
99+
100+
let version = match find_field(&version_path, &json_line) {
101+
Some(value) => value.as_i32().unwrap(),
102+
None => {
103+
warn!("Skipping `{}` because version was missing.", &line);
104+
continue;
105+
}
106+
};
107+
108+
let record = Record {
109+
id: id.to_string(),
110+
version: version,
111+
data: line,
112+
};
113+
114+
match records.get(&id) {
115+
Some(row) if row.version > version => continue,
116+
_ => records.insert(id.to_string(), record),
117+
};
118+
}
119+
Err(err) => error!("IO error: Line {}: {}", counter, err),
120+
}
121+
}
122+
}
123+
124+
let mut write_counter = 0;
125+
126+
for record in records.values() {
127+
write_counter += 1;
128+
pb.set_message(&format!(
129+
"Writing line {} of {}",
130+
write_counter,
131+
records.len()
132+
));
133+
let line = format!("{}\n", record.data);
134+
if file.write_all(line.as_bytes()).is_err() {
135+
error!("Trouble writing line {} to disk", write_counter);
136+
}
137+
}
138+
139+
return Ok(());
140+
}

‎src/commands/json/mod.rs

+15-147
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,24 @@
1-
use std::collections::BTreeMap;
2-
use std::fs::File;
3-
use std::io::prelude::*;
4-
use std::io::{BufRead, BufReader, LineWriter};
5-
use std::path::PathBuf;
1+
mod latest;
2+
mod sql;
3+
4+
pub use self::latest::do_json_latest_command;
5+
pub use self::sql::do_json_sql_command;
66

7-
use clap::ArgMatches;
8-
use indicatif::{ProgressBar, ProgressStyle};
97
use json::JsonValue;
108

11-
use crate::commands::CliError;
9+
// https://github.com/ms705/nom-sql
1210

13-
#[derive(Debug)]
14-
struct Record {
15-
id: String,
16-
version: i32,
17-
data: String,
18-
}
11+
fn find_field<'a>(field: &Vec<String>, json_input: &'a JsonValue) -> Option<&'a JsonValue> {
12+
let mut value = json_input;
1913

20-
#[test]
21-
fn leading_dot_will_be_ignored() {
22-
let split: Vec<String> = parse_path(".abc.123");
14+
for part in field {
15+
value = &value[part];
16+
if value.is_null() {
17+
return None;
18+
}
19+
}
2320

24-
assert_eq!(2, split.len());
21+
return Some(value);
2522
}
2623

2724
fn parse_path(path: &str) -> Vec<String> {
@@ -35,132 +32,3 @@ fn parse_path(path: &str) -> Vec<String> {
3532

3633
return split;
3734
}
38-
39-
fn build_key(keys: &Vec<Vec<String>>, json_input: &JsonValue) -> Option<String> {
40-
let mut key_list: Vec<String> = Vec::new();
41-
42-
for part in keys {
43-
if let Some(key) = find_field(part, json_input) {
44-
key_list.push(s!(key.as_str().unwrap_or("null")));
45-
}
46-
}
47-
48-
if key_list.is_empty() {
49-
return None;
50-
}
51-
52-
return Some(key_list.join(":"));
53-
}
54-
55-
pub fn do_json_latest_command(args: &ArgMatches) -> Result<(), CliError> {
56-
let output_path = args.value_of("OUTPUT").unwrap();
57-
let output_path = PathBuf::from(output_path);
58-
59-
let id_fields: Vec<Vec<String>> = args
60-
.values_of("id")
61-
.unwrap()
62-
.into_iter()
63-
.map(|x| parse_path(x))
64-
.collect();
65-
66-
let version_path: Vec<String> = parse_path(args.value_of("seq").unwrap());
67-
68-
let file = File::create(output_path).unwrap();
69-
let mut file = LineWriter::new(file);
70-
71-
let spinner_style =
72-
ProgressStyle::default_spinner().template("{prefix:.bold.dim} {spinner:.green} {wide_msg}");
73-
let progress_bar = ProgressBar::new_spinner();
74-
progress_bar.set_style(spinner_style.clone());
75-
progress_bar.enable_steady_tick(100);
76-
77-
let mut records: BTreeMap<String, Record> = BTreeMap::new();
78-
79-
let input_paths: Vec<String> = args.values_of("INPUT").unwrap().map(|x| s!(x)).collect();
80-
81-
let mut counter = 0;
82-
for input_path in input_paths.into_iter() {
83-
let reader = match crate::commands::file::open_file(&input_path) {
84-
Ok(reader) => BufReader::new(reader),
85-
Err(e) => {
86-
error!("Unable to open {} because {}", input_path, e.to_string());
87-
continue;
88-
}
89-
};
90-
91-
for line in reader.lines() {
92-
match line {
93-
Ok(line) => {
94-
counter += 1;
95-
if counter % 10 == 0 {
96-
progress_bar.set_message(&format!(
97-
"Reading line {}\t Used: {}",
98-
counter,
99-
records.len()
100-
));
101-
}
102-
103-
let json_line = json::parse(&line).unwrap();
104-
105-
let id = match build_key(&id_fields, &json_line) {
106-
Some(value) => value,
107-
None => {
108-
warn!("Skipping `{}` because all id was missing.", &line);
109-
continue;
110-
}
111-
};
112-
113-
let version = match find_field(&version_path, &json_line) {
114-
Some(value) => value.as_i32().unwrap(),
115-
None => {
116-
warn!("Skipping `{}` because version was missing.", &line);
117-
continue;
118-
}
119-
};
120-
121-
let record = Record {
122-
id: id.to_string(),
123-
version: version,
124-
data: line,
125-
};
126-
127-
match records.get(&id) {
128-
Some(row) if row.version > version => continue,
129-
_ => records.insert(id.to_string(), record),
130-
};
131-
}
132-
Err(err) => error!("IO error: Line {}: {}", counter, err),
133-
}
134-
}
135-
}
136-
137-
let mut write_counter = 0;
138-
139-
for record in records.values() {
140-
write_counter += 1;
141-
progress_bar.set_message(&format!(
142-
"Writing line {} of {}",
143-
write_counter,
144-
records.len()
145-
));
146-
let line = format!("{}\n", record.data);
147-
if file.write_all(line.as_bytes()).is_err() {
148-
error!("Trouble writing line {} to disk", write_counter);
149-
}
150-
}
151-
152-
return Ok(());
153-
}
154-
155-
fn find_field<'a>(field: &Vec<String>, json_input: &'a JsonValue) -> Option<&'a JsonValue> {
156-
let mut value = json_input;
157-
158-
for part in field {
159-
value = &value[part];
160-
if value.is_null() {
161-
return None;
162-
}
163-
}
164-
165-
return Some(value);
166-
}

‎src/commands/json/sql/mod.rs

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
mod parser;
2+
3+
use std::io::{BufRead, BufReader, Read};
4+
5+
use clap::ArgMatches;
6+
use json::JsonValue;
7+
use parser::{Expression, SqlSource};
8+
9+
use super::{find_field, parse_path};
10+
11+
use crate::commands::CliError;
12+
13+
pub fn do_json_sql_command(args: &ArgMatches) -> Result<(), CliError> {
14+
let exp = Expression::from(args.value_of("EXP").unwrap())?;
15+
16+
let from_path = match exp.source {
17+
SqlSource::None => parse_path("."),
18+
SqlSource::SqlFrom { path } => parse_path(&path),
19+
};
20+
21+
let input_paths: Vec<String> = args.values_of("json").unwrap().map(|x| s!(x)).collect();
22+
23+
let line_processor = LineProcessor { path: from_path };
24+
25+
for input_path in input_paths.into_iter() {
26+
let reader = match crate::commands::file::open_file(&input_path) {
27+
Ok(reader) => BufReader::new(reader),
28+
Err(e) => {
29+
error!("Unable to open {} because {}", input_path, e.to_string());
30+
continue;
31+
}
32+
};
33+
34+
line_processor.process_stream(reader);
35+
}
36+
37+
Ok(())
38+
}
39+
40+
struct LineProcessor {
41+
path: Vec<String>,
42+
}
43+
44+
impl LineProcessor {
45+
fn process_stream<R: Read>(&self, buf_read: BufReader<R>) {
46+
let mut line_counter = 0;
47+
for line in buf_read.lines() {
48+
line_counter += 1;
49+
match line {
50+
Ok(line) => {
51+
self.parse_json_line(line, line_counter);
52+
}
53+
Err(err) => error!("IO error: Line {}: {}", line_counter, err),
54+
}
55+
}
56+
}
57+
58+
fn parse_json_line(&self, line: String, line_number: u32) {
59+
match json::parse(&line) {
60+
Ok(parsed) => {
61+
if let Some(sub_json) = find_field(&self.path, &parsed) {
62+
trace!("sub json: {:?}", sub_json);
63+
self.process_json_line(sub_json)
64+
}
65+
}
66+
Err(_) => error!("Line was not JSON: {}", line_number),
67+
}
68+
}
69+
70+
fn process_json_line(&self, parent: &JsonValue) {
71+
match parent {
72+
JsonValue::Array(values) => {
73+
for value in values {
74+
if self.filter_obj(&value) {
75+
self.print_fields(&value);
76+
}
77+
}
78+
}
79+
JsonValue::Object(_) => {
80+
if self.filter_obj(parent) {
81+
self.print_fields(&parent);
82+
}
83+
}
84+
_ => error!(
85+
"Tried to parse an object that wasn't an array or object, got {:?}",
86+
parent
87+
),
88+
}
89+
}
90+
91+
fn filter_obj(&self, parent: &JsonValue) -> bool {
92+
if let JsonValue::Object(_obj) = parent {
93+
true
94+
} else {
95+
error!("{:?} is not a json object", parent);
96+
false
97+
}
98+
}
99+
100+
fn print_fields(&self, parent: &JsonValue) {
101+
println!("{}", parent.dump());
102+
}
103+
}

‎src/commands/json/sql/parser.rs

+268
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
use pest::iterators::{Pair, Pairs};
2+
use pest::Parser;
3+
4+
use crate::commands::CliError;
5+
#[cfg(test)]
6+
use pest::{consumes_to, parses_to};
7+
8+
#[derive(Parser)]
9+
#[grammar = "commands/json/sql/sql.pest"] // relative to src
10+
struct SqlParser;
11+
12+
#[derive(PartialEq, Debug)]
13+
pub struct Expression {
14+
pub operation: SqlOperation,
15+
pub source: SqlSource,
16+
pub filter: Vec<SqlFilter>,
17+
pub limit: Option<u32>,
18+
pub offset: Option<u32>,
19+
}
20+
21+
impl Expression {
22+
fn new() -> Self {
23+
Expression {
24+
operation: SqlOperation::None,
25+
source: SqlSource::None,
26+
filter: Vec::new(),
27+
limit: None,
28+
offset: None,
29+
}
30+
}
31+
32+
pub fn from(sql: &str) -> Result<Self, CliError> {
33+
let mut expression = Expression::new();
34+
let mut pairs = match SqlParser::parse(Rule::bool_expr, sql) {
35+
Ok(pairs) => pairs,
36+
Err(e) => {
37+
return Err(CliError::new(format!("{}", e), 5));
38+
}
39+
};
40+
41+
extract(&mut expression, &mut pairs);
42+
43+
Ok(expression)
44+
}
45+
}
46+
47+
#[derive(PartialEq, Debug)]
48+
pub enum SqlOperation {
49+
None,
50+
Select { columns: Vec<String> },
51+
}
52+
53+
#[derive(PartialEq, Debug)]
54+
pub enum SqlSource {
55+
None,
56+
SqlFrom { path: String },
57+
}
58+
59+
#[derive(PartialEq, Debug)]
60+
pub enum SqlFilter {
61+
Condition(SqlComparison),
62+
Or,
63+
And,
64+
}
65+
66+
#[derive(PartialEq, Debug)]
67+
pub enum SqlComparison {
68+
Equal { path: String, value: String },
69+
NotEqual { path: String, value: String },
70+
}
71+
72+
fn extract(expression: &mut Expression, bool_expr: &mut Pairs<Rule>) {
73+
let mut exp = bool_expr.next().unwrap().into_inner();
74+
let pairs = exp.next().unwrap().into_inner();
75+
for pair in pairs {
76+
trace!("Element: {:?}", pair);
77+
match pair.as_rule() {
78+
Rule::select_operation => {
79+
handle_select(expression, pair);
80+
}
81+
Rule::source => {
82+
handle_from(expression, pair);
83+
}
84+
Rule::filter => {
85+
handle_where(expression, pair);
86+
}
87+
Rule::limit => {}
88+
Rule::offset => {}
89+
_ => panic!("SQL Parse error! {:?}", pair),
90+
}
91+
}
92+
}
93+
94+
fn handle_select(expression: &mut Expression, parent: Pair<Rule>) {
95+
let inner = parent.into_inner();
96+
97+
let columns: Vec<String> = inner.map(|x| s!(x.as_span().as_str())).collect();
98+
expression.operation = SqlOperation::Select { columns: columns };
99+
}
100+
101+
fn handle_from(expression: &mut Expression, parent: Pair<Rule>) {
102+
expression.source = SqlSource::SqlFrom {
103+
path: s!(parent.into_inner().as_str()),
104+
};
105+
}
106+
107+
fn handle_where(expression: &mut Expression, parent: Pair<Rule>) {
108+
let mut filters: Vec<SqlFilter> = Vec::new();
109+
110+
for pair in parent.into_inner() {
111+
match pair.as_rule() {
112+
Rule::condition => {
113+
filters.push(handle_condition(&mut pair.into_inner()));
114+
}
115+
Rule::logic => match pair.into_inner().next().unwrap().as_rule() {
116+
Rule::and => {
117+
filters.push(SqlFilter::And);
118+
}
119+
Rule::or => {
120+
filters.push(SqlFilter::Or);
121+
}
122+
_ => {
123+
panic!("Unknown operator");
124+
}
125+
},
126+
_ => panic!("Unable to parse where"),
127+
}
128+
}
129+
130+
expression.filter = filters;
131+
}
132+
133+
fn handle_condition(parent: &mut Pairs<Rule>) -> SqlFilter {
134+
let path = s!(parent.next().unwrap().as_str());
135+
let operator = parent.next().unwrap();
136+
let value = s!(parent.next().unwrap().as_str());
137+
138+
let comparison = match operator.as_rule() {
139+
Rule::eq => SqlComparison::Equal {
140+
path: path,
141+
value: value,
142+
},
143+
Rule::neq => SqlComparison::NotEqual {
144+
path: path,
145+
value: value,
146+
},
147+
_ => panic!(
148+
"Value must be `=` or `!=`, but was {:?} ({:?})",
149+
operator.as_str(),
150+
operator.as_rule()
151+
),
152+
};
153+
154+
SqlFilter::Condition(comparison)
155+
}
156+
157+
#[test]
158+
fn validate_ast_builder() {
159+
let mut pairs =
160+
SqlParser::parse(Rule::bool_expr, "select * from . where .a.b.c = 123").unwrap();
161+
let mut expression = Expression::new();
162+
163+
extract(&mut expression, &mut pairs);
164+
165+
assert_eq!(
166+
SqlOperation::Select {
167+
columns: vec![s!("*")]
168+
},
169+
expression.operation
170+
);
171+
assert_eq!(SqlSource::SqlFrom { path: s!(".") }, expression.source);
172+
assert_eq!(
173+
vec![SqlFilter::Condition(SqlComparison::Equal {
174+
path: s!(".a.b.c"),
175+
value: s!("123")
176+
})],
177+
expression.filter
178+
);
179+
}
180+
181+
#[test]
182+
fn validate_parsing() {
183+
parses_to! {
184+
parser: SqlParser,
185+
input: "select * from . where .a.b.c = 123",
186+
rule: Rule::expr,
187+
tokens: [
188+
expr(0, 34, [
189+
select_operation(0,9,[
190+
result_column(7,8, [])
191+
]),
192+
source(9, 15, [
193+
json_path(14, 15, []),
194+
]),
195+
filter(16, 34, [
196+
condition(22, 34, [
197+
json_path(22, 28, []),
198+
eq(29, 30, []),
199+
value(31, 34, [num_literal(31, 34, [])])
200+
])
201+
])
202+
])
203+
]
204+
}
205+
206+
parses_to! {
207+
parser: SqlParser,
208+
input: "select .abc from .a.b.c",
209+
rule: Rule::expr,
210+
tokens: [
211+
expr(0, 23, [
212+
select_operation(0, 12,[
213+
result_column(7, 11, [
214+
json_path(7, 11, [])
215+
])
216+
]),
217+
source(12, 23, [
218+
json_path(17, 23, []),
219+
])
220+
])
221+
]
222+
}
223+
224+
parses_to! {
225+
parser: SqlParser,
226+
input: "select .abc from .a.b.c limit 10 offset 10",
227+
rule: Rule::expr,
228+
tokens: [
229+
expr(0, 42, [
230+
select_operation(0, 12,[
231+
result_column(7, 11, [
232+
json_path(7, 11, [])
233+
])
234+
]),
235+
source(12, 23, [
236+
json_path(17, 23, []),
237+
]),
238+
limit(24, 32, [
239+
pos_num_literal(30, 32, []),
240+
]),
241+
offset(33, 42, [
242+
pos_num_literal(40, 42, []),
243+
])
244+
])
245+
]
246+
}
247+
248+
parses_to! {
249+
parser: SqlParser,
250+
input: "select .abc from .a.b.c limit 10",
251+
rule: Rule::expr,
252+
tokens: [
253+
expr(0, 32, [
254+
select_operation(0, 12,[
255+
result_column(7, 11, [
256+
json_path(7, 11, [])
257+
])
258+
]),
259+
source(12, 23, [
260+
json_path(17, 23, []),
261+
]),
262+
limit(24, 32, [
263+
pos_num_literal(30, 32, []),
264+
])
265+
])
266+
]
267+
}
268+
}

‎src/commands/json/sql/sql.pest

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
json_path = ${ ("." ~ (ASCII_ALPHANUMERIC){1,}){1,} | "." }
2+
3+
bool_expr = { SOI ~ expr ~ EOI }
4+
expr = { operation ~ source ~ filter? ~ (limit ~ offset?)? }
5+
6+
operation = _{ select_operation }
7+
source = { ^"FROM" ~ json_path }
8+
filter = { ^"WHERE" ~ condition ~ ( logic ~ condition )* }
9+
10+
logic = { and | or }
11+
and = { ^"and" }
12+
or = { ^"or" }
13+
14+
condition = { json_path ~ comparison ~ value }
15+
16+
limit = { ^"LIMIT" ~ pos_num_literal }
17+
offset = { ^"OFFSET" ~ pos_num_literal }
18+
19+
result_column = { "*" | json_path }
20+
select_operation = { ^"SELECT" ~ result_column ~ ("," ~ result_column )* }
21+
22+
comparison = _{ eq | neq }
23+
eq = { "=" }
24+
neq = { "!=" | "<>"}
25+
26+
value = {
27+
string_literal
28+
| num_literal
29+
| "(" ~ string_literal ~("," ~ string_literal)* ~ ")"
30+
| "(" ~ num_literal ~("," ~ num_literal)* ~ ")"
31+
}
32+
33+
pos_num_literal = @{ ("0" | ASCII_NONZERO_DIGIT ~ ASCII_DIGIT*) }
34+
35+
num_literal = @{
36+
"-"?
37+
~ ("0" | ASCII_NONZERO_DIGIT ~ ASCII_DIGIT*)
38+
~ ("." ~ ASCII_DIGIT*)?
39+
}
40+
41+
string_literal = ${ "\"" ~ string ~ "\"" }
42+
string = @{ char* }
43+
char = {
44+
!("\"" | "\\") ~ ANY
45+
}
46+
47+
WHITESPACE = _{ " " }

‎src/commands/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ mod file;
3131
pub mod har;
3232
pub mod json;
3333
pub mod nsq;
34+
pub mod progress;
3435
pub mod time;

‎src/commands/nsq/api.rs

+421-253
Large diffs are not rendered by default.

‎src/commands/nsq/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod api;
2+
pub mod model;
23
pub mod post;
34
pub mod stats;

‎src/commands/nsq/model.rs

+208
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#[derive(Serialize, Deserialize)]
2+
pub struct StatusTopicsResponse {
3+
pub data: StatusTopicsDetails,
4+
}
5+
6+
#[derive(Serialize, Deserialize)]
7+
pub struct StatusTopicsDetails {
8+
pub topics: Vec<TopicDetails>,
9+
}
10+
11+
#[derive(Serialize, Deserialize)]
12+
pub struct TopicDetails {
13+
pub topic_name: String,
14+
pub depth: u64,
15+
pub message_count: u64,
16+
pub channels: Vec<TopicChannel>,
17+
}
18+
19+
#[derive(Serialize, Deserialize)]
20+
pub struct TopicChannel {
21+
pub depth: u64,
22+
pub in_flight_count: u64,
23+
pub channel_name: String,
24+
pub message_count: u64,
25+
pub clients: Vec<ClientDetails>,
26+
}
27+
28+
#[derive(Serialize, Deserialize)]
29+
pub struct ClientDetails {
30+
pub hostname: String,
31+
}
32+
33+
#[derive(Serialize, Deserialize)]
34+
pub struct LookupProducer {
35+
pub remote_address: String,
36+
pub hostname: String,
37+
pub broadcast_address: String,
38+
pub tcp_port: i32,
39+
pub http_port: i32,
40+
pub version: String,
41+
}
42+
43+
#[derive(Serialize, Deserialize)]
44+
pub struct LookupResponse {
45+
pub status_code: i32,
46+
pub data: LookupData,
47+
}
48+
49+
#[derive(Serialize, Deserialize)]
50+
pub struct LookupData {
51+
pub producers: Vec<LookupProducer>,
52+
}
53+
54+
#[test]
55+
fn test_extract_size() {
56+
let body = "{
57+
\"version\": \"1.1.0\",
58+
\"health\": \"OK\",
59+
\"start_time\": 1548185315,
60+
\"topics\": [
61+
{
62+
\"topic_name\": \"foo\",
63+
\"channels\": [
64+
{
65+
\"channel_name\": \"tail180292#ephemeral\",
66+
\"depth\": 3,
67+
\"backend_depth\": 0,
68+
\"in_flight_count\": 1,
69+
\"deferred_count\": 0,
70+
\"message_count\": 1399,
71+
\"requeue_count\": 0,
72+
\"timeout_count\": 0,
73+
\"clients\": [
74+
{
75+
\"client_id\": \"ethan\",
76+
\"hostname\": \"ethan.local\",
77+
\"version\": \"V2\",
78+
\"remote_address\": \"1.2.3.4:33576\",
79+
\"state\": 3,
80+
\"ready_count\": 1,
81+
\"in_flight_count\": 1,
82+
\"message_count\": 1396,
83+
\"finish_count\": 1395,
84+
\"requeue_count\": 0,
85+
\"connect_ts\": 1549065745,
86+
\"sample_rate\": 0,
87+
\"deflate\": false,
88+
\"snappy\": false,
89+
\"user_agent\": \"nsq_tail/1.1.0 go-nsq/1.0.6\",
90+
\"tls\": false,
91+
\"tls_cipher_suite\": \"\",
92+
\"tls_version\": \"\",
93+
\"tls_negotiated_protocol\": \"\",
94+
\"tls_negotiated_protocol_is_mutual\": false
95+
}
96+
],
97+
\"paused\": false,
98+
\"e2e_processing_latency\": {
99+
\"count\": 0,
100+
\"percentiles\": null
101+
}
102+
}
103+
],
104+
\"depth\": 0,
105+
\"backend_depth\": 0,
106+
\"message_count\": 29259,
107+
\"paused\": false,
108+
\"e2e_processing_latency\": {
109+
\"count\": 0,
110+
\"percentiles\": null
111+
}
112+
}
113+
],
114+
\"memory\": {
115+
\"heap_objects\": 21625,
116+
\"heap_idle_bytes\": 11886592,
117+
\"heap_in_use_bytes\": 3743744,
118+
\"heap_released_bytes\": 10280960,
119+
\"gc_pause_usec_100\": 5612,
120+
\"gc_pause_usec_99\": 3742,
121+
\"gc_pause_usec_95\": 878,
122+
\"next_gc_bytes\": 4194304,
123+
\"gc_total_runs\": 219
124+
}
125+
}";
126+
127+
serde_json::from_str::<StatusTopicsDetails>(body).unwrap();
128+
}
129+
130+
#[test]
131+
fn older_api_test() {
132+
let body = "{
133+
\"status_code\": 200,
134+
\"status_txt\": \"OK\",
135+
\"data\": {
136+
\"version\": \"0.3.8\",
137+
\"health\": \"OK\",
138+
\"start_time\": 1543350728,
139+
\"topics\": [
140+
{
141+
\"topic_name\": \"foo\",
142+
\"channels\": [
143+
{
144+
\"channel_name\": \"tail180292#ephemeral\",
145+
\"depth\": 3,
146+
\"backend_depth\": 0,
147+
\"in_flight_count\": 1,
148+
\"deferred_count\": 0,
149+
\"message_count\": 1399,
150+
\"requeue_count\": 0,
151+
\"timeout_count\": 0,
152+
\"clients\": [
153+
{
154+
\"client_id\": \"ethan\",
155+
\"hostname\": \"ethan.local\",
156+
\"version\": \"V2\",
157+
\"remote_address\": \"1.2.3.4:33576\",
158+
\"state\": 3,
159+
\"ready_count\": 1,
160+
\"in_flight_count\": 1,
161+
\"message_count\": 1396,
162+
\"finish_count\": 1395,
163+
\"requeue_count\": 0,
164+
\"connect_ts\": 1549065745,
165+
\"sample_rate\": 0,
166+
\"deflate\": false,
167+
\"snappy\": false,
168+
\"user_agent\": \"nsq_tail/1.1.0 go-nsq/1.0.6\",
169+
\"tls\": false,
170+
\"tls_cipher_suite\": \"\",
171+
\"tls_version\": \"\",
172+
\"tls_negotiated_protocol\": \"\",
173+
\"tls_negotiated_protocol_is_mutual\": false
174+
}
175+
],
176+
\"paused\": false,
177+
\"e2e_processing_latency\": {
178+
\"count\": 0,
179+
\"percentiles\": null
180+
}
181+
}
182+
],
183+
\"depth\": 0,
184+
\"backend_depth\": 0,
185+
\"message_count\": 29259,
186+
\"paused\": false,
187+
\"e2e_processing_latency\": {
188+
\"count\": 0,
189+
\"percentiles\": null
190+
}
191+
}
192+
],
193+
\"memory\": {
194+
\"heap_objects\": 21625,
195+
\"heap_idle_bytes\": 11886592,
196+
\"heap_in_use_bytes\": 3743744,
197+
\"heap_released_bytes\": 10280960,
198+
\"gc_pause_usec_100\": 5612,
199+
\"gc_pause_usec_99\": 3742,
200+
\"gc_pause_usec_95\": 878,
201+
\"next_gc_bytes\": 4194304,
202+
\"gc_total_runs\": 219
203+
}
204+
}
205+
}";
206+
207+
serde_json::from_str::<StatusTopicsResponse>(body).unwrap();
208+
}

‎src/commands/nsq/post.rs

+33-29
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use clap::ArgMatches;
88
use crossbeam_channel::bounded;
99
use crossbeam_channel::Receiver;
1010

11-
use indicatif::{ProgressBar, ProgressStyle};
12-
11+
use crate::commands::nsq::api::*;
12+
use crate::commands::progress::*;
1313
use crate::commands::CliError;
1414

1515
const RATE_LIMIT: &str = "200";
@@ -107,6 +107,13 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
107107
)
108108
};
109109

110+
let status = NsqState::new(
111+
&options.nsq_lookup,
112+
NsqFilter::Topic {
113+
topics: vec![options.topic.clone()].into_iter().collect(),
114+
},
115+
);
116+
110117
debug!("Capacity of in messages: {}", capacity);
111118
debug!("Interval of new tokens: {:?}", interval);
112119

@@ -117,21 +124,20 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
117124

118125
THREADS_RUNNING.store(true, Ordering::SeqCst);
119126

120-
let urls = super::api::get_base_url_for_topic(&options.nsq_lookup, &options.topic);
121-
let base_url = if urls.is_empty() {
122-
return Err(CliError::new("Unable to get NSQ Host", 2));
123-
} else {
124-
format!("{}", urls.first().unwrap())
127+
let base_addresss = match status.get_topic_url(&options.topic.clone()) {
128+
Some(address) => address,
129+
None => {
130+
error!("NSQ does now know about topic {}", options.topic);
131+
return Err(CliError::new("Unable to get NSQ Host", 2));
132+
}
125133
};
126134

127-
let submit_url = format!("{}/pub?topic={}", base_url, &options.topic);
135+
let submit_url = format!("{}/pub?topic={}", base_addresss, &options.topic);
128136

129-
let style = ProgressStyle::default_bar()
130-
.template("[{elapsed_precise}] {bar:80.cyan/blue} {pos:>7}/{len:7} {msg}")
131-
.progress_chars("##-");
132-
let progress_bar = ProgressBar::new(options.limit as u64);
133-
progress_bar.set_style(style.clone());
134-
progress_bar.enable_steady_tick(1000);
137+
let pb = ProgressBarHelper::new(ProgressBarType::SizedProgressBar(
138+
options.limit,
139+
"[{elapsed_precise}] {bar:80.cyan/blue} {pos:>7}/{len:7} {msg}",
140+
));
135141

136142
let (s1, r1) = bounded(20);
137143

@@ -146,8 +152,8 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
146152
}
147153

148154
let topic = format!("{}", options.topic);
149-
do_api_check(&base_url, &topic);
150-
threads.push(thread::spawn(move || check_api_status(&base_url, &topic)));
155+
do_api_check(&topic, &status);
156+
threads.push(thread::spawn(move || check_api_status(&topic, &status)));
151157

152158
let reader = crate::commands::file::open_file(options.file.to_str().unwrap())?;
153159
let reader = BufReader::new(reader);
@@ -157,7 +163,7 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
157163
loop {
158164
let max_depth = API_DEPTH.load(Ordering::SeqCst);
159165
let in_flight = API_IN_FLIGHT.load(Ordering::SeqCst);
160-
progress_bar.set_message(&format!(
166+
pb.set_message(&format!(
161167
"In Progress: {:4}\tBacklog Size: {:4}\tOffset: {}",
162168
in_flight,
163169
max_depth,
@@ -178,7 +184,7 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
178184
}
179185

180186
ratelimit.wait();
181-
progress_bar.inc(1);
187+
pb.inc();
182188

183189
if options.offset > counter {
184190
OFFSET.fetch_add(1, Ordering::SeqCst);
@@ -189,7 +195,7 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
189195
ERRORS.fetch_add(1, Ordering::SeqCst);
190196
}
191197
}
192-
progress_bar.finish();
198+
pb.done();
193199

194200
THREADS_RUNNING.store(false, Ordering::SeqCst);
195201

@@ -211,24 +217,22 @@ pub fn do_send_command(args: &ArgMatches) -> Result<(), CliError> {
211217
return Ok(());
212218
}
213219

214-
fn check_api_status(base_url: &str, topic: &str) {
220+
fn check_api_status(topic: &str, state: &NsqState) {
215221
loop {
216222
if !THREADS_RUNNING.load(Ordering::SeqCst) {
217223
return;
218224
}
219225

220-
do_api_check(base_url, topic);
226+
do_api_check(topic, state);
227+
std::thread::sleep(Duration::from_millis(200));
221228
}
222229
}
223230

224-
fn do_api_check(base_url: &str, topic: &str) {
225-
if let Some((max_depth, in_flight)) = super::api::get_queue_size(base_url, topic) {
226-
let max_depth = std::cmp::max(0, max_depth) as usize;
227-
let in_flight = std::cmp::max(0, in_flight) as usize;
228-
API_IN_FLIGHT.store(in_flight, Ordering::SeqCst);
229-
API_DEPTH.store(max_depth, Ordering::SeqCst);
230-
std::thread::sleep(Duration::from_millis(200));
231-
}
231+
fn do_api_check(topic: &str, state: &NsqState) {
232+
let snapshot = state.update_status();
233+
let agg = snapshot.topics.get(topic).unwrap().producer_aggregate();
234+
let max_depth = std::cmp::max(0, agg.depth) as usize;
235+
API_DEPTH.store(max_depth, Ordering::SeqCst);
232236
}
233237

234238
fn process_messages(reciever: Receiver<String>, path: String) {

‎src/commands/nsq/stats.rs

+124-198
Large diffs are not rendered by default.

‎src/commands/progress.rs

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use indicatif::{ProgressBar, ProgressStyle};
2+
3+
pub struct ProgressBarHelper {
4+
pb: ProgressBar,
5+
}
6+
7+
pub enum ProgressBarType<'a> {
8+
SizedProgressBar(usize, &'a str),
9+
UnsizedProgressBar(&'a str),
10+
}
11+
12+
impl ProgressBarHelper {
13+
pub fn new(p_type: ProgressBarType) -> Self {
14+
if atty::isnt(atty::Stream::Stdout) {
15+
ProgressBarHelper {
16+
pb: ProgressBar::hidden(),
17+
}
18+
} else {
19+
let template = match &p_type {
20+
ProgressBarType::SizedProgressBar(_, template) => template,
21+
ProgressBarType::UnsizedProgressBar(template) => template,
22+
};
23+
24+
let pb = match p_type {
25+
ProgressBarType::SizedProgressBar(size, _) => ProgressBar::new(size as u64),
26+
ProgressBarType::UnsizedProgressBar(_) => ProgressBar::new_spinner(),
27+
};
28+
29+
let spinner_style = ProgressStyle::default_spinner()
30+
.tick_chars("⠁⠂⠄⡀⢀⠠⠐⠈ ")
31+
.progress_chars("#>-")
32+
.template(&template);
33+
pb.set_style(spinner_style.clone());
34+
pb.enable_steady_tick(100);
35+
ProgressBarHelper { pb }
36+
}
37+
}
38+
39+
#[allow(dead_code)]
40+
pub fn inc_with_message(&self, message: &str) {
41+
self.pb.inc(1);
42+
self.pb.set_message(message);
43+
}
44+
45+
pub fn inc(&self) {
46+
self.pb.inc(1);
47+
}
48+
49+
pub fn set_message(&self, message: &str) {
50+
self.pb.set_message(message);
51+
}
52+
53+
pub fn done(&self) {
54+
self.pb.finish_and_clear();
55+
}
56+
}

‎src/commands/time/mod.rs

+91-41
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,29 @@ mod parse;
44

55
use clap::ArgMatches;
66

7-
use colored::*;
87
use chrono::prelude::*;
8+
use colored::*;
99

10-
use parse::TimeResult;
1110
use crate::commands::CliError;
11+
use parse::TimeResult;
1212

1313
pub fn do_time_command(args: &ArgMatches) -> Result<(), CliError> {
1414
let input_array: Vec<&str> = args.values_of("INPUT").unwrap().collect();
1515
let input_string = input_array.join(" ");
16-
return match (
17-
parse::parse(&input_string),
18-
args.is_present("utc_only"),
19-
) {
16+
return match (parse::parse(&input_string), args.is_present("utc_only")) {
2017
(Ok(date), true) => render_utc(date),
2118
(Ok(date), false) => render_full_output(date),
2219
(Err(_), _) => {
23-
error!(
24-
"Unable to understand `{}`",
25-
input_string
26-
);
27-
return Err(CliError::new("Unknown format", 1));
28-
}
20+
error!("Unable to understand `{}`", input_string);
21+
return Err(CliError::new("Unknown format", 1));
22+
}
2923
};
3024
}
3125

32-
3326
fn render_full_output(input: TimeResult) -> Result<(), CliError> {
3427
let datetimes = match input {
35-
TimeResult::Epoch(epoch) => {
36-
epoch.make_permutations()
37-
},
38-
TimeResult::String(string_format) => {
39-
string_format.make_permutations()
40-
}
28+
TimeResult::Epoch(epoch) => epoch.make_permutations(),
29+
TimeResult::String(string_format) => string_format.make_permutations(),
4130
};
4231

4332
let mut first = true;
@@ -48,35 +37,96 @@ fn render_full_output(input: TimeResult) -> Result<(), CliError> {
4837
first = false;
4938

5039
println!("Understood the date was {}", format!("{}", datetime).bold());
51-
println!(" ├── Date 'human': {}", format!("{}", datetime.format("%b %e %T %Y")).bold());
52-
println!(" ├── Date in M/D/Y: {}", format!("{}/{}/{}", datetime.month(), datetime.day(), datetime.year()).bold());
53-
println!(" ├── Date in YMD: {}", format!("{}{:02}{:02}", datetime.year(), datetime.month(), datetime.day()).bold());
54-
println!(" ├── Day in year: {}", format!("{}", datetime.ordinal()).bold());
55-
println!(" ├── ISO week {}", format!("{}-{}", datetime.iso_week().year(), datetime.iso_week().week()).bold());
56-
println!(" ├── Day of week: {}", format!("{:?}", datetime.weekday()).bold());
57-
println!(" └── Time: {}", format!("{} {}", datetime.time(), datetime.timezone()).bold());
58-
println!(" ├── Unix epoch(s): {}", format!("{}", datetime.timestamp()).bold());
59-
println!(" ├── Unix epoch(ms): {}", format!("{}", datetime.timestamp_millis()).bold());
60-
println!(" ├── Unix epoch(ns): {}", format!("{}", datetime.timestamp_nanos()).bold());
61-
println!(" ├── In UTC: {}", format!("{}", datetime.with_timezone(&chrono::Utc)).bold());
62-
println!(" ├── In Eastern: {}", format!("{}", datetime.with_timezone(&chrono_tz::US::Eastern)).bold());
63-
println!(" ├── In Central: {}", format!("{}", datetime.with_timezone(&chrono_tz::US::Central)).bold());
64-
println!(" ├── In Mountain: {}", format!("{}", datetime.with_timezone(&chrono_tz::US::Mountain)).bold());
65-
println!(" ├── In Arazona: {}", format!("{}", datetime.with_timezone(&chrono_tz::US::Arizona)).bold());
66-
println!(" └── In Pacific: {}", format!("{}", datetime.with_timezone(&chrono_tz::US::Pacific)).bold());
40+
println!(
41+
" ├── Date 'human': {}",
42+
format!("{}", datetime.format("%b %e %T %Y")).bold()
43+
);
44+
println!(
45+
" ├── Date in M/D/Y: {}",
46+
format!(
47+
"{}/{}/{}",
48+
datetime.month(),
49+
datetime.day(),
50+
datetime.year()
51+
)
52+
.bold()
53+
);
54+
println!(
55+
" ├── Date in YMD: {}",
56+
format!(
57+
"{}{:02}{:02}",
58+
datetime.year(),
59+
datetime.month(),
60+
datetime.day()
61+
)
62+
.bold()
63+
);
64+
println!(
65+
" ├── Day in year: {}",
66+
format!("{}", datetime.ordinal()).bold()
67+
);
68+
println!(
69+
" ├── ISO week {}",
70+
format!(
71+
"{}-{}",
72+
datetime.iso_week().year(),
73+
datetime.iso_week().week()
74+
)
75+
.bold()
76+
);
77+
println!(
78+
" ├── Day of week: {}",
79+
format!("{:?}", datetime.weekday()).bold()
80+
);
81+
println!(
82+
" └── Time: {}",
83+
format!("{} {}", datetime.time(), datetime.timezone()).bold()
84+
);
85+
println!(
86+
" ├── Unix epoch(s): {}",
87+
format!("{}", datetime.timestamp()).bold()
88+
);
89+
println!(
90+
" ├── Unix epoch(ms): {}",
91+
format!("{}", datetime.timestamp_millis()).bold()
92+
);
93+
println!(
94+
" ├── Unix epoch(ns): {}",
95+
format!("{}", datetime.timestamp_nanos()).bold()
96+
);
97+
println!(
98+
" ├── In UTC: {}",
99+
format!("{}", datetime.with_timezone(&chrono::Utc)).bold()
100+
);
101+
println!(
102+
" ├── In Eastern: {}",
103+
format!("{}", datetime.with_timezone(&chrono_tz::US::Eastern)).bold()
104+
);
105+
println!(
106+
" ├── In Central: {}",
107+
format!("{}", datetime.with_timezone(&chrono_tz::US::Central)).bold()
108+
);
109+
println!(
110+
" ├── In Mountain: {}",
111+
format!("{}", datetime.with_timezone(&chrono_tz::US::Mountain)).bold()
112+
);
113+
println!(
114+
" ├── In Arazona: {}",
115+
format!("{}", datetime.with_timezone(&chrono_tz::US::Arizona)).bold()
116+
);
117+
println!(
118+
" └── In Pacific: {}",
119+
format!("{}", datetime.with_timezone(&chrono_tz::US::Pacific)).bold()
120+
);
67121
}
68122

69123
Ok(())
70124
}
71125

72126
fn render_utc(input: TimeResult) -> Result<(), CliError> {
73127
let datetime = match input {
74-
TimeResult::Epoch(epoch) => {
75-
epoch.to_utc_date_time()
76-
},
77-
TimeResult::String(string_format) => {
78-
string_format.to_utc_date_time()
79-
}
128+
TimeResult::Epoch(epoch) => epoch.to_utc_date_time(),
129+
TimeResult::String(string_format) => string_format.to_utc_date_time(),
80130
};
81131

82132
println!("{}", datetime);

‎src/commands/time/parse.rs

+251-82
Large diffs are not rendered by default.

‎src/main.rs

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![recursion_limit = "1024"]
12
#![feature(slice_concat_ext)]
23
#![feature(vec_remove_item)]
34

@@ -25,15 +26,19 @@ extern crate prettytable;
2526
extern crate termion;
2627
#[macro_use]
2728
extern crate lazy_static;
29+
extern crate atty;
30+
extern crate pest;
31+
#[macro_use]
32+
extern crate pest_derive;
2833

2934
mod commands;
3035

3136
use clap::App;
3237

3338
use commands::har::exec::do_har_command;
34-
use commands::json::do_json_latest_command;
39+
use commands::json::*;
3540
use commands::nsq::post::do_send_command;
36-
use commands::nsq::stats::do_status_command;
41+
use commands::nsq::stats::do_stats_command;
3742
use commands::time::do_time_command;
3843
use kopy_common_lib::configure_logging;
3944

@@ -54,11 +59,12 @@ fn main() {
5459
("har", Some(har_matches)) => do_har_command(har_matches),
5560
("json", Some(json_matches)) => match json_matches.subcommand() {
5661
("latest", Some(filter_matches)) => do_json_latest_command(filter_matches),
62+
("sql", Some(filter_matches)) => do_json_sql_command(filter_matches),
5763
_ => unreachable!(),
5864
},
5965
("nsq", Some(nsq_matches)) => match nsq_matches.subcommand() {
6066
("send", Some(send_matches)) => do_send_command(send_matches),
61-
("status", Some(send_matches)) => do_status_command(send_matches),
67+
("stats", Some(send_matches)) => do_stats_command(send_matches),
6268
_ => unreachable!(),
6369
},
6470
_ => unreachable!(),

0 commit comments

Comments
 (0)
Please sign in to comment.