Skip to content

Commit e15f712

Browse files
junweid62Junwei Dai
and
Junwei Dai
authored
Add verbose pipeline parameter to output each processor's execution details (opensearch-project#16843)
* Add verbose pipeline parameter to output each processor's execution details Signed-off-by: Junwei Dai <junweid@amazon.com> * add change log Signed-off-by: Junwei Dai <junweid@amazon.com> # Conflicts: # CHANGELOG.md * Refactor ProcessorExecutionDetail to improve field handling Signed-off-by: Junwei Dai <junweid@amazon.com> * Fix ITtest Fail Signed-off-by: Junwei Dai <junweid@amazon.com> * Add more unit test Signed-off-by: Junwei Dai <junweid@amazon.com> * resolve comments Signed-off-by: Junwei Dai <junweid@amazon.com> * 1.add todo to change version.current 2.use exist xcontentUtil to read 3.move processor excution key to ProcessorExecutionDetail Signed-off-by: Junwei Dai <junweid@amazon.com> * refactor code Signed-off-by: Junwei Dai <junweid@amazon.com> * refactor code based on the comment Signed-off-by: Junwei Dai <junweid@amazon.com> * refactor code based on the comment Signed-off-by: Junwei Dai <junweid@amazon.com> * 1.add javadoc 2.refactor error message Signed-off-by: Junwei Dai <junweid@amazon.com> * change error message Signed-off-by: Junwei Dai <junweid@amazon.com> * 1.Added wrappers for tracking execution details of search processors. 2.Removed redundant logic for cleaner and simpler implementation. Signed-off-by: Junwei Dai <junweid@amazon.com> * change version to 3.0.0 Signed-off-by: Junwei Dai <junweid@amazon.com> * fix unit test Signed-off-by: Junwei Dai <junweid@amazon.com> * fix unit test Signed-off-by: Junwei Dai <junweid@amazon.com> * addressed comments 1. removed unnecessary log Signed-off-by: Junwei Dai <junweid@amazon.com> * addressed comments Signed-off-by: Junwei Dai <junweid@amazon.com> * revise comment to opensearch.api Signed-off-by: Junwei Dai <junweid@amazon.com> * removed unused logger and comment Signed-off-by: Junwei Dai <junweid@amazon.com> * removed unnecessary try catch block. add more comment Signed-off-by: Junwei Dai <junweid@amazon.com> * addressed comments Signed-off-by: Junwei Dai <junweid@amazon.com> * remove wrong unit test Signed-off-by: Junwei Dai <junweid@amazon.com> --------- Signed-off-by: Junwei Dai <junweid@amazon.com> Co-authored-by: Junwei Dai <junweid@amazon.com>
1 parent cedbb9e commit e15f712

21 files changed

+1251
-27
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
2525
- Update script supports java.lang.String.sha1() and java.lang.String.sha256() methods ([#16923](https://github.com/opensearch-project/OpenSearch/pull/16923))
2626
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
27+
- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)).
2728
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
2829
- Introduce a setting to disable download of full cluster state from remote on term mismatch([#16798](https://github.com/opensearch-project/OpenSearch/pull/16798/))
2930
- Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802))

server/src/main/java/org/opensearch/action/search/SearchResponse.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.opensearch.search.aggregations.Aggregations;
6060
import org.opensearch.search.aggregations.InternalAggregations;
6161
import org.opensearch.search.internal.InternalSearchResponse;
62+
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
6263
import org.opensearch.search.profile.ProfileShardResult;
6364
import org.opensearch.search.profile.SearchProfileShardResults;
6465
import org.opensearch.search.suggest.Suggest;
@@ -73,6 +74,7 @@
7374
import java.util.function.Supplier;
7475

7576
import static org.opensearch.action.search.SearchResponseSections.EXT_FIELD;
77+
import static org.opensearch.action.search.SearchResponseSections.PROCESSOR_RESULT_FIELD;
7678
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
7779

7880
/**
@@ -394,6 +396,7 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
394396
List<ShardSearchFailure> failures = new ArrayList<>();
395397
Clusters clusters = Clusters.EMPTY;
396398
List<SearchExtBuilder> extBuilders = new ArrayList<>();
399+
List<ProcessorExecutionDetail> processorResult = new ArrayList<>();
397400
for (Token token = parser.nextToken(); token != Token.END_OBJECT; token = parser.nextToken()) {
398401
if (token == Token.FIELD_NAME) {
399402
currentFieldName = parser.currentName();
@@ -517,6 +520,11 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
517520
extBuilders.add(searchExtBuilder);
518521
}
519522
}
523+
} else if (PROCESSOR_RESULT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
524+
while ((token = parser.nextToken()) != Token.END_ARRAY) {
525+
ProcessorExecutionDetail detail = ProcessorExecutionDetail.fromXContent(parser);
526+
processorResult.add(detail);
527+
}
520528
} else {
521529
parser.skipChildren();
522530
}
@@ -530,7 +538,8 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
530538
terminatedEarly,
531539
profile,
532540
numReducePhases,
533-
extBuilders
541+
extBuilders,
542+
processorResult
534543
);
535544
return new SearchResponse(
536545
searchResponseSections,

server/src/main/java/org/opensearch/action/search/SearchResponseSections.java

+25-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.search.SearchExtBuilder;
4141
import org.opensearch.search.SearchHits;
4242
import org.opensearch.search.aggregations.Aggregations;
43+
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
4344
import org.opensearch.search.profile.ProfileShardResult;
4445
import org.opensearch.search.profile.SearchProfileShardResults;
4546
import org.opensearch.search.suggest.Suggest;
@@ -65,7 +66,7 @@
6566
public class SearchResponseSections implements ToXContentFragment {
6667

6768
public static final ParseField EXT_FIELD = new ParseField("ext");
68-
69+
public static final ParseField PROCESSOR_RESULT_FIELD = new ParseField("processor_results");
6970
protected final SearchHits hits;
7071
protected final Aggregations aggregations;
7172
protected final Suggest suggest;
@@ -74,6 +75,7 @@ public class SearchResponseSections implements ToXContentFragment {
7475
protected final Boolean terminatedEarly;
7576
protected final int numReducePhases;
7677
protected final List<SearchExtBuilder> searchExtBuilders = new ArrayList<>();
78+
protected final List<ProcessorExecutionDetail> processorResult = new ArrayList<>();
7779

7880
public SearchResponseSections(
7981
SearchHits hits,
@@ -84,7 +86,17 @@ public SearchResponseSections(
8486
SearchProfileShardResults profileResults,
8587
int numReducePhases
8688
) {
87-
this(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, Collections.emptyList());
89+
this(
90+
hits,
91+
aggregations,
92+
suggest,
93+
timedOut,
94+
terminatedEarly,
95+
profileResults,
96+
numReducePhases,
97+
Collections.emptyList(),
98+
Collections.emptyList()
99+
);
88100
}
89101

90102
public SearchResponseSections(
@@ -95,7 +107,8 @@ public SearchResponseSections(
95107
Boolean terminatedEarly,
96108
SearchProfileShardResults profileResults,
97109
int numReducePhases,
98-
List<SearchExtBuilder> searchExtBuilders
110+
List<SearchExtBuilder> searchExtBuilders,
111+
List<ProcessorExecutionDetail> processorResult
99112
) {
100113
this.hits = hits;
101114
this.aggregations = aggregations;
@@ -104,6 +117,7 @@ public SearchResponseSections(
104117
this.timedOut = timedOut;
105118
this.terminatedEarly = terminatedEarly;
106119
this.numReducePhases = numReducePhases;
120+
this.processorResult.addAll(processorResult);
107121
this.searchExtBuilders.addAll(Objects.requireNonNull(searchExtBuilders, "searchExtBuilders must not be null"));
108122
}
109123

@@ -166,13 +180,21 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params)
166180
}
167181
builder.endObject();
168182
}
183+
184+
if (!processorResult.isEmpty()) {
185+
builder.field(PROCESSOR_RESULT_FIELD.getPreferredName(), processorResult);
186+
}
169187
return builder;
170188
}
171189

172190
public List<SearchExtBuilder> getSearchExtBuilders() {
173191
return Collections.unmodifiableList(this.searchExtBuilders);
174192
}
175193

194+
public List<ProcessorExecutionDetail> getProcessorResult() {
195+
return processorResult;
196+
}
197+
176198
protected void writeTo(StreamOutput out) throws IOException {
177199
throw new UnsupportedOperationException();
178200
}

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

+3
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,9 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
256256
if (request.hasParam("timeout")) {
257257
searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
258258
}
259+
if (request.hasParam("verbose_pipeline")) {
260+
searchSourceBuilder.verbosePipeline(request.paramAsBoolean("verbose_pipeline", false));
261+
}
259262
if (request.hasParam("terminate_after")) {
260263
int terminateAfter = request.paramAsInt("terminate_after", SearchContext.DEFAULT_TERMINATE_AFTER);
261264
if (terminateAfter < 0) {

server/src/main/java/org/opensearch/search/SearchHits.java

+17
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.lucene.search.TotalHits.Relation;
3838
import org.opensearch.common.Nullable;
3939
import org.opensearch.common.annotation.PublicApi;
40+
import org.opensearch.common.io.stream.BytesStreamOutput;
4041
import org.opensearch.common.lucene.Lucene;
4142
import org.opensearch.core.common.io.stream.StreamInput;
4243
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -166,6 +167,22 @@ public SearchHit[] getHits() {
166167
return this.hits;
167168
}
168169

170+
/**
171+
* Creates a deep copy of this SearchHits instance.
172+
*
173+
* @return a deep copy of the current SearchHits object
174+
* @throws IOException if an I/O exception occurs during serialization or deserialization
175+
*/
176+
public SearchHits deepCopy() throws IOException {
177+
try (BytesStreamOutput out = new BytesStreamOutput()) {
178+
this.writeTo(out);
179+
180+
try (StreamInput in = out.bytes().streamInput()) {
181+
return new SearchHits(in);
182+
}
183+
}
184+
}
185+
169186
/**
170187
* Return the hit as the provided position.
171188
*/

server/src/main/java/org/opensearch/search/builder/SearchSourceBuilder.java

+40-2
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
136136
public static final ParseField SLICE = new ParseField("slice");
137137
public static final ParseField POINT_IN_TIME = new ParseField("pit");
138138
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");
139+
public static final ParseField VERBOSE_SEARCH_PIPELINE = new ParseField("verbose_pipeline");
139140

140141
public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
141142
return fromXContent(parser, true);
@@ -226,6 +227,8 @@ public static HighlightBuilder highlight() {
226227

227228
private String searchPipeline;
228229

230+
private boolean verbosePipeline = false;
231+
229232
/**
230233
* Constructs a new search source builder.
231234
*/
@@ -302,6 +305,9 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
302305
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
303306
searchPipeline = in.readOptionalString();
304307
}
308+
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
309+
verbosePipeline = in.readBoolean();
310+
}
305311
}
306312

307313
@Override
@@ -385,6 +391,9 @@ public void writeTo(StreamOutput out) throws IOException {
385391
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
386392
out.writeOptionalString(searchPipeline);
387393
}
394+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
395+
out.writeBoolean(verbosePipeline);
396+
}
388397
}
389398

390399
/**
@@ -1142,6 +1151,26 @@ public SearchSourceBuilder pipeline(String searchPipeline) {
11421151
return this;
11431152
}
11441153

1154+
/**
1155+
* Enables or disables verbose mode for the search pipeline.
1156+
*
1157+
* When verbose mode is enabled, detailed information about each processor
1158+
* in the search pipeline is included in the search response. This includes
1159+
* the processor name, execution status, input, output, and time taken for processing.
1160+
*
1161+
* This parameter is primarily intended for debugging purposes, allowing users
1162+
* to track how data flows and transforms through the search pipeline.
1163+
*
1164+
*/
1165+
public SearchSourceBuilder verbosePipeline(Boolean verbosePipeline) {
1166+
this.verbosePipeline = verbosePipeline;
1167+
return this;
1168+
}
1169+
1170+
public Boolean verbosePipeline() {
1171+
return verbosePipeline;
1172+
}
1173+
11451174
/**
11461175
* Rewrites this search source builder into its primitive form. e.g. by
11471176
* rewriting the QueryBuilder. If the builder did not change the identity
@@ -1240,6 +1269,7 @@ private SearchSourceBuilder shallowCopy(
12401269
rewrittenBuilder.derivedFieldsObject = derivedFieldsObject;
12411270
rewrittenBuilder.derivedFields = derivedFields;
12421271
rewrittenBuilder.searchPipeline = searchPipeline;
1272+
rewrittenBuilder.verbosePipeline = verbosePipeline;
12431273
return rewrittenBuilder;
12441274
}
12451275

@@ -1309,6 +1339,8 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
13091339
profile = parser.booleanValue();
13101340
} else if (SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
13111341
searchPipeline = parser.text();
1342+
} else if (VERBOSE_SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
1343+
verbosePipeline = parser.booleanValue();
13121344
} else {
13131345
throw new ParsingException(
13141346
parser.getTokenLocation(),
@@ -1642,6 +1674,10 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
16421674
builder.field(SEARCH_PIPELINE.getPreferredName(), searchPipeline);
16431675
}
16441676

1677+
if (verbosePipeline) {
1678+
builder.field(VERBOSE_SEARCH_PIPELINE.getPreferredName(), verbosePipeline);
1679+
}
1680+
16451681
return builder;
16461682
}
16471683

@@ -1920,7 +1956,8 @@ public int hashCode() {
19201956
pointInTimeBuilder,
19211957
derivedFieldsObject,
19221958
derivedFields,
1923-
searchPipeline
1959+
searchPipeline,
1960+
verbosePipeline
19241961
);
19251962
}
19261963

@@ -1966,7 +2003,8 @@ public boolean equals(Object obj) {
19662003
&& Objects.equals(pointInTimeBuilder, other.pointInTimeBuilder)
19672004
&& Objects.equals(derivedFieldsObject, other.derivedFieldsObject)
19682005
&& Objects.equals(derivedFields, other.derivedFields)
1969-
&& Objects.equals(searchPipeline, other.searchPipeline);
2006+
&& Objects.equals(searchPipeline, other.searchPipeline)
2007+
&& Objects.equals(verbosePipeline, other.verbosePipeline);
19702008
}
19712009

19722010
@Override

server/src/main/java/org/opensearch/search/internal/InternalSearchResponse.java

+39-4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.search.SearchExtBuilder;
4343
import org.opensearch.search.SearchHits;
4444
import org.opensearch.search.aggregations.InternalAggregations;
45+
import org.opensearch.search.pipeline.ProcessorExecutionDetail;
4546
import org.opensearch.search.profile.SearchProfileShardResults;
4647
import org.opensearch.search.suggest.Suggest;
4748

@@ -73,7 +74,17 @@ public InternalSearchResponse(
7374
Boolean terminatedEarly,
7475
int numReducePhases
7576
) {
76-
this(hits, aggregations, suggest, profileResults, timedOut, terminatedEarly, numReducePhases, Collections.emptyList());
77+
this(
78+
hits,
79+
aggregations,
80+
suggest,
81+
profileResults,
82+
timedOut,
83+
terminatedEarly,
84+
numReducePhases,
85+
Collections.emptyList(),
86+
Collections.emptyList()
87+
);
7788
}
7889

7990
public InternalSearchResponse(
@@ -84,9 +95,20 @@ public InternalSearchResponse(
8495
boolean timedOut,
8596
Boolean terminatedEarly,
8697
int numReducePhases,
87-
List<SearchExtBuilder> searchExtBuilderList
98+
List<SearchExtBuilder> searchExtBuilderList,
99+
List<ProcessorExecutionDetail> processorResult
88100
) {
89-
super(hits, aggregations, suggest, timedOut, terminatedEarly, profileResults, numReducePhases, searchExtBuilderList);
101+
super(
102+
hits,
103+
aggregations,
104+
suggest,
105+
timedOut,
106+
terminatedEarly,
107+
profileResults,
108+
numReducePhases,
109+
searchExtBuilderList,
110+
processorResult
111+
);
90112
}
91113

92114
public InternalSearchResponse(StreamInput in) throws IOException {
@@ -98,7 +120,8 @@ public InternalSearchResponse(StreamInput in) throws IOException {
98120
in.readOptionalBoolean(),
99121
in.readOptionalWriteable(SearchProfileShardResults::new),
100122
in.readVInt(),
101-
readSearchExtBuildersOnOrAfter(in)
123+
readSearchExtBuildersOnOrAfter(in),
124+
readProcessorResultOnOrAfter(in)
102125
);
103126
}
104127

@@ -112,6 +135,7 @@ public void writeTo(StreamOutput out) throws IOException {
112135
out.writeOptionalWriteable(profileResults);
113136
out.writeVInt(numReducePhases);
114137
writeSearchExtBuildersOnOrAfter(out, searchExtBuilders);
138+
writeProcessorResultOnOrAfter(out, processorResult);
115139
}
116140

117141
private static List<SearchExtBuilder> readSearchExtBuildersOnOrAfter(StreamInput in) throws IOException {
@@ -123,4 +147,15 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
123147
out.writeNamedWriteableList(searchExtBuilders);
124148
}
125149
}
150+
151+
private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
152+
return (in.getVersion().onOrAfter(Version.V_3_0_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
153+
}
154+
155+
private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
156+
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
157+
out.writeList(processorResult);
158+
}
159+
}
160+
126161
}

0 commit comments

Comments
 (0)