Skip to content

Commit b869de9

Browse files
authored
Refactor parsing logic for Measurement (#112)
* Refactor parsing logic for Measurement Signed-off-by: Chenyang Ji <cyji@amazon.com> * fix failed tests Signed-off-by: Chenyang Ji <cyji@amazon.com> --------- Signed-off-by: Chenyang Ji <cyji@amazon.com>
1 parent 839e6f7 commit b869de9

File tree

6 files changed

+94
-17
lines changed

6 files changed

+94
-17
lines changed

src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import org.opensearch.action.search.SearchRequest;
1919
import org.opensearch.action.search.SearchResponse;
2020
import org.opensearch.client.Client;
21+
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
22+
import org.opensearch.common.xcontent.XContentType;
2123
import org.opensearch.core.xcontent.NamedXContentRegistry;
24+
import org.opensearch.core.xcontent.XContentParser;
2225
import org.opensearch.index.IndexNotFoundException;
2326
import org.opensearch.index.query.MatchQueryBuilder;
2427
import org.opensearch.index.query.QueryBuilder;
@@ -104,7 +107,9 @@ public List<SearchQueryRecord> read(final String from, final String to) {
104107
try {
105108
SearchResponse searchResponse = client.search(searchRequest).actionGet();
106109
for (SearchHit hit : searchResponse.getHits()) {
107-
SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry);
110+
XContentParser parser = XContentType.JSON.xContent()
111+
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());
112+
SearchQueryRecord record = SearchQueryRecord.fromXContent(parser);
108113
records.add(record);
109114
}
110115
} catch (IndexNotFoundException ignored) {} catch (Exception e) {

src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java

+57-3
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,25 @@
1010

1111
import java.io.IOException;
1212
import java.util.Objects;
13+
import org.opensearch.core.common.ParsingException;
1314
import org.opensearch.core.common.io.stream.StreamInput;
1415
import org.opensearch.core.common.io.stream.StreamOutput;
1516
import org.opensearch.core.common.io.stream.Writeable;
1617
import org.opensearch.core.xcontent.ToXContent;
1718
import org.opensearch.core.xcontent.ToXContentObject;
1819
import org.opensearch.core.xcontent.XContentBuilder;
20+
import org.opensearch.core.xcontent.XContentParser;
1921

2022
/**
2123
* Measurement that is stored in the SearchQueryRecord. Measurement can be of a specific AggregationType
2224
*/
2325
public class Measurement implements ToXContentObject, Writeable {
2426
private static int DEFAULT_COUNT = 1;
27+
28+
private static final String NUMBER = "number";
29+
private static final String COUNT = "count";
30+
private static final String AGGREGATION_TYPE = "aggregationType";
31+
2532
private AggregationType aggregationType;
2633
private Number number;
2734
private int count;
@@ -55,6 +62,21 @@ public Measurement(Number number) {
5562
this(number, DEFAULT_COUNT, AggregationType.DEFAULT_AGGREGATION_TYPE);
5663
}
5764

65+
private Measurement() {}
66+
67+
/**
68+
* Construct a measurement from {@link XContentParser}
69+
*
70+
* @param parser {@link XContentParser}
71+
* @return {@link Measurement}
72+
* @throws IOException IOException
73+
*/
74+
public static Measurement fromXContent(XContentParser parser) throws IOException {
75+
Measurement builder = new Measurement();
76+
builder.parseXContent(parser);
77+
return builder;
78+
}
79+
5880
/**
5981
* Add measurement number to the current number based on the aggregationType.
6082
* If aggregateType is NONE, replace the number since we are not aggregating in this case.
@@ -150,13 +172,45 @@ public void setAggregationType(AggregationType aggregationType) {
150172
@Override
151173
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
152174
builder.startObject();
153-
builder.field("number", number);
154-
builder.field("count", count);
155-
builder.field("aggregationType", aggregationType.toString());
175+
builder.field(NUMBER, number);
176+
builder.field(COUNT, count);
177+
builder.field(AGGREGATION_TYPE, aggregationType.toString());
156178
builder.endObject();
157179
return builder;
158180
}
159181

182+
/**
183+
* Parse a measurement from {@link XContentParser}
184+
*
185+
* @param parser {@link XContentParser}
186+
* @throws IOException IOException
187+
*/
188+
private void parseXContent(XContentParser parser) throws IOException {
189+
XContentParser.Token token = parser.currentToken();
190+
if (token != XContentParser.Token.START_OBJECT) {
191+
throw new ParsingException(
192+
parser.getTokenLocation(),
193+
"Expected [" + XContentParser.Token.START_OBJECT + "] but found [" + token + "]",
194+
parser.getTokenLocation()
195+
);
196+
} else {
197+
String currentFieldName = null;
198+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
199+
if (token == XContentParser.Token.FIELD_NAME) {
200+
currentFieldName = parser.currentName();
201+
} else if (token.isValue()) {
202+
if (NUMBER.equals(currentFieldName)) {
203+
this.number = parser.numberValue();
204+
} else if (COUNT.equals(currentFieldName)) {
205+
this.count = parser.intValue();
206+
} else if (AGGREGATION_TYPE.equals(currentFieldName)) {
207+
this.aggregationType = AggregationType.valueOf(parser.text());
208+
}
209+
}
210+
}
211+
}
212+
}
213+
160214
@Override
161215
public void writeTo(StreamOutput out) throws IOException {
162216
writeNumber(out, number);

src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java

+7-12
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,17 @@
1818
import org.apache.logging.log4j.LogManager;
1919
import org.apache.logging.log4j.Logger;
2020
import org.opensearch.Version;
21-
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
22-
import org.opensearch.common.xcontent.XContentType;
2321
import org.opensearch.core.common.Strings;
2422
import org.opensearch.core.common.io.stream.StreamInput;
2523
import org.opensearch.core.common.io.stream.StreamOutput;
2624
import org.opensearch.core.common.io.stream.Writeable;
2725
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
2826
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
2927
import org.opensearch.core.xcontent.MediaTypeRegistry;
30-
import org.opensearch.core.xcontent.NamedXContentRegistry;
3128
import org.opensearch.core.xcontent.ToXContentObject;
3229
import org.opensearch.core.xcontent.XContentBuilder;
3330
import org.opensearch.core.xcontent.XContentParser;
3431
import org.opensearch.core.xcontent.XContentParserUtils;
35-
import org.opensearch.search.SearchHit;
3632
import org.opensearch.search.builder.SearchSourceBuilder;
3733
import org.opensearch.tasks.Task;
3834

@@ -141,18 +137,17 @@ public SearchQueryRecord(final long timestamp, Map<MetricType, Measurement> meas
141137
}
142138

143139
/**
144-
* Returns a SearchQueryRecord from a SearchHit
140+
* Construct a SearchQueryRecord from {@link XContentParser}
145141
*
146-
* @param hit SearchHit to parse into SearchQueryRecord
147-
* @param namedXContentRegistry NamedXContentRegistry for parsing purposes
148-
* @return SearchQueryRecord
142+
* @param parser {@link XContentParser}
143+
* @return {@link SearchQueryRecord}
144+
* @throws IOException IOException
149145
*/
150-
public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry namedXContentRegistry) throws IOException {
146+
public static SearchQueryRecord fromXContent(XContentParser parser) throws IOException {
151147
long timestamp = 0L;
152148
Map<MetricType, Measurement> measurements = new HashMap<>();
153149
Map<Attribute, Object> attributes = new HashMap<>();
154-
XContentParser parser = XContentType.JSON.xContent()
155-
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString());
150+
156151
parser.nextToken();
157152
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
158153
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -167,7 +162,7 @@ public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry n
167162
case CPU:
168163
case MEMORY:
169164
MetricType metric = MetricType.fromString(fieldName);
170-
measurements.put(metric, new Measurement(metric.parseValue(parser.longValue())));
165+
measurements.put(metric, Measurement.fromXContent(parser));
171166
break;
172167
case SEARCH_TYPE:
173168
attributes.put(Attribute.SEARCH_TYPE, parser.text());

src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java

+11
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,19 @@ public static SearchQueryRecord createFixedSearchQueryRecord() {
240240
Map<MetricType, Measurement> measurements = Map.of(MetricType.LATENCY, new Measurement(1L));
241241

242242
Map<String, Long> phaseLatencyMap = new HashMap<>();
243+
phaseLatencyMap.put("expand", 1L);
244+
phaseLatencyMap.put("query", 10L);
245+
phaseLatencyMap.put("fetch", 1L);
243246
Map<Attribute, Object> attributes = new HashMap<>();
244247
attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT));
248+
attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap);
249+
attributes.put(
250+
Attribute.TASK_RESOURCE_USAGES,
251+
List.of(
252+
new TaskResourceInfo("action", 2L, 1L, "id", new TaskResourceUsage(1000L, 2000L)),
253+
new TaskResourceInfo("action2", 3L, 1L, "id2", new TaskResourceUsage(2000L, 1000L))
254+
)
255+
);
245256

246257
return new SearchQueryRecord(timestamp, measurements, attributes);
247258
}

src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void testSerialize() throws Exception {
4141

4242
public void testToXContent() throws IOException {
4343
char[] expectedXcontent =
44-
"{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}"
44+
"{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"phase_latency_map\":{\"expand\":1,\"query\":10,\"fetch\":1},\"task_resource_usages\":[{\"action\":\"action\",\"taskId\":2,\"parentTaskId\":1,\"nodeId\":\"id\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":1000,\"memory_in_bytes\":2000}},{\"action\":\"action2\",\"taskId\":3,\"parentTaskId\":1,\"nodeId\":\"id2\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":2000,\"memory_in_bytes\":1000}}],\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}"
4545
.toCharArray();
4646
TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries();
4747
ClusterName clusterName = new ClusterName("test-cluster");

src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java

+12
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import java.util.List;
1515
import java.util.Set;
1616
import org.opensearch.common.io.stream.BytesStreamOutput;
17+
import org.opensearch.common.xcontent.json.JsonXContent;
1718
import org.opensearch.core.common.io.stream.StreamInput;
19+
import org.opensearch.core.xcontent.XContentParser;
1820
import org.opensearch.plugin.insights.QueryInsightsTestUtils;
1921
import org.opensearch.test.OpenSearchTestCase;
2022

@@ -54,6 +56,16 @@ public void testEqual() {
5456
assertEquals(record1, record2);
5557
}
5658

59+
public void testFromXContent() {
60+
SearchQueryRecord record = QueryInsightsTestUtils.createFixedSearchQueryRecord();
61+
try (XContentParser recordParser = createParser(JsonXContent.jsonXContent, record.toString())) {
62+
SearchQueryRecord parsedRecord = SearchQueryRecord.fromXContent(recordParser);
63+
QueryInsightsTestUtils.checkRecordsEquals(List.of(record), List.of(parsedRecord));
64+
} catch (Exception e) {
65+
fail("Test should not throw exceptions when parsing search query record");
66+
}
67+
}
68+
5769
/**
5870
* Serialize and deserialize a SearchQueryRecord.
5971
* @param record A SearchQueryRecord to serialize.

0 commit comments

Comments
 (0)