Skip to content

Commit 77a1193

Browse files
dzane17reta
andauthored
Tracing for deep search path (opensearch-project#12103)
* Tracing for deep search path Signed-off-by: David Zane <davizane@amazon.com> * Refactor search phase tracing instrumentation to detach span creation from the SearchRequestOperationsListener Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Close WrappedPhase span Signed-off-by: David Zane <davizane@amazon.com> * Add asserting listener class Signed-off-by: David Zane <davizane@amazon.com> * Cleanup AbstractSearchAsyncActionTests tests Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Cleanup SearchAsyncActionTests tests Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Cleanup CanMatchPreFilterSearchPhaseTests tests Signed-off-by: Andriy Redko <andriy.redko@aiven.io> --------- Signed-off-by: David Zane <davizane@amazon.com> Signed-off-by: Andriy Redko <andriy.redko@aiven.io> Co-authored-by: Andriy Redko <andriy.redko@aiven.io>
1 parent ef9314e commit 77a1193

26 files changed

+506
-201
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
1919
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
2020
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
21+
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))
2122

2223
### Dependencies
2324
- Bump `log4j-core` from 2.18.0 to 2.19.0

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanContext.java

+15
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,19 @@ public SpanContext(Span span) {
3131
Span getSpan() {
3232
return span;
3333
}
34+
35+
/**
36+
* Sets the error for the current span behind this context
37+
* @param cause error
38+
*/
39+
public void setError(final Exception cause) {
40+
span.setError(cause);
41+
}
42+
43+
/**
44+
* Ends current span
45+
*/
46+
public void endSpan() {
47+
span.endSpan();
48+
}
3449
}

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/SpanCreationContext.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ public SpanCreationContext attributes(Attributes attributes) {
7979
}
8080

8181
/**
82-
* Sets the parent for spann
83-
* @param parent parent
82+
* Sets the parent for span
83+
* @param parent parent span context
8484
* @return spanCreationContext
8585
*/
8686
public SpanCreationContext parent(SpanContext parent) {

plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void onPhaseStart(SearchPhaseContext context) {}
116116
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
117117

118118
@Override
119-
public void onPhaseFailure(SearchPhaseContext context) {}
119+
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}
120120

121121
@Override
122122
public void onRequestStart(SearchRequestContext searchRequestContext) {}

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@
5858
import org.opensearch.search.internal.SearchContext;
5959
import org.opensearch.search.internal.ShardSearchRequest;
6060
import org.opensearch.search.pipeline.PipelinedRequest;
61+
import org.opensearch.telemetry.tracing.Span;
62+
import org.opensearch.telemetry.tracing.SpanCreationContext;
63+
import org.opensearch.telemetry.tracing.SpanScope;
64+
import org.opensearch.telemetry.tracing.Tracer;
6165
import org.opensearch.transport.Transport;
6266

6367
import java.util.ArrayDeque;
@@ -116,6 +120,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
116120
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
117121
private final boolean throttleConcurrentRequests;
118122
private final SearchRequestContext searchRequestContext;
123+
private final Tracer tracer;
119124

120125
private SearchPhase currentPhase;
121126
private boolean currentPhaseHasLifecycle;
@@ -140,7 +145,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
140145
SearchPhaseResults<Result> resultConsumer,
141146
int maxConcurrentRequestsPerNode,
142147
SearchResponse.Clusters clusters,
143-
SearchRequestContext searchRequestContext
148+
SearchRequestContext searchRequestContext,
149+
Tracer tracer
144150
) {
145151
super(name);
146152
final List<SearchShardIterator> toSkipIterators = new ArrayList<>();
@@ -177,6 +183,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
177183
this.results = resultConsumer;
178184
this.clusters = clusters;
179185
this.searchRequestContext = searchRequestContext;
186+
this.tracer = tracer;
180187
}
181188

182189
@Override
@@ -221,6 +228,7 @@ public final void start() {
221228
null
222229
)
223230
);
231+
onRequestEnd(searchRequestContext);
224232
return;
225233
}
226234
executePhase(this);
@@ -460,15 +468,24 @@ private void onRequestEnd(SearchRequestContext searchRequestContext) {
460468
}
461469

462470
private void executePhase(SearchPhase phase) {
463-
try {
471+
Span phaseSpan = tracer.startSpan(SpanCreationContext.server().name("[phase/" + phase.getName() + "]"));
472+
try (final SpanScope scope = tracer.withSpanInScope(phaseSpan)) {
464473
onPhaseStart(phase);
465474
phase.recordAndRun();
466475
} catch (Exception e) {
467476
if (logger.isDebugEnabled()) {
468477
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
469478
}
470479

480+
if (currentPhaseHasLifecycle == false) {
481+
phaseSpan.setError(e);
482+
}
483+
471484
onPhaseFailure(phase, "", e);
485+
} finally {
486+
if (currentPhaseHasLifecycle == false) {
487+
phaseSpan.endSpan();
488+
}
472489
}
473490
}
474491

@@ -733,7 +750,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
733750
@Override
734751
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
735752
if (currentPhaseHasLifecycle) {
736-
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
753+
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
737754
}
738755
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
739756
}

server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.opensearch.search.sort.FieldSortBuilder;
4545
import org.opensearch.search.sort.MinAndMax;
4646
import org.opensearch.search.sort.SortOrder;
47+
import org.opensearch.telemetry.tracing.Tracer;
4748
import org.opensearch.transport.Transport;
4849

4950
import java.util.Comparator;
@@ -91,7 +92,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
9192
SearchTask task,
9293
Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory,
9394
SearchResponse.Clusters clusters,
94-
SearchRequestContext searchRequestContext
95+
SearchRequestContext searchRequestContext,
96+
Tracer tracer
9597
) {
9698
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
9799
super(
@@ -112,7 +114,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
112114
new CanMatchSearchPhaseResults(shardsIts.size()),
113115
shardsIts.size(),
114116
clusters,
115-
searchRequestContext
117+
searchRequestContext,
118+
tracer
116119
);
117120
this.phaseFactory = phaseFactory;
118121
this.shardsIts = shardsIts;

server/src/main/java/org/opensearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.search.dfs.AggregatedDfs;
4242
import org.opensearch.search.dfs.DfsSearchResult;
4343
import org.opensearch.search.internal.AliasFilter;
44+
import org.opensearch.telemetry.tracing.Tracer;
4445
import org.opensearch.transport.Transport;
4546

4647
import java.util.List;
@@ -77,7 +78,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
7778
final ClusterState clusterState,
7879
final SearchTask task,
7980
SearchResponse.Clusters clusters,
80-
SearchRequestContext searchRequestContext
81+
SearchRequestContext searchRequestContext,
82+
final Tracer tracer
8183
) {
8284
super(
8385
SearchPhaseName.DFS_PRE_QUERY.getName(),
@@ -97,7 +99,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
9799
new ArraySearchPhaseResults<>(shardsIts.size()),
98100
request.getMaxConcurrentShardRequests(),
99101
clusters,
100-
searchRequestContext
102+
searchRequestContext,
103+
tracer
101104
);
102105
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
103106
this.searchPhaseController = searchPhaseController;

server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.search.internal.SearchContext;
4444
import org.opensearch.search.internal.ShardSearchRequest;
4545
import org.opensearch.search.query.QuerySearchResult;
46+
import org.opensearch.telemetry.tracing.Tracer;
4647
import org.opensearch.transport.Transport;
4748

4849
import java.util.Map;
@@ -82,7 +83,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
8283
ClusterState clusterState,
8384
SearchTask task,
8485
SearchResponse.Clusters clusters,
85-
SearchRequestContext searchRequestContext
86+
SearchRequestContext searchRequestContext,
87+
final Tracer tracer
8688
) {
8789
super(
8890
SearchPhaseName.QUERY.getName(),
@@ -102,7 +104,8 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
102104
resultConsumer,
103105
request.getMaxConcurrentShardRequests(),
104106
clusters,
105-
searchRequestContext
107+
searchRequestContext,
108+
tracer
106109
);
107110
this.topDocsSize = SearchPhaseController.getTopDocsSize(request);
108111
this.trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ void setTotalHits(TotalHits totalHits) {
7878
this.totalHits = totalHits;
7979
}
8080

81-
TotalHits totalHits() {
81+
public TotalHits totalHits() {
8282
return totalHits;
8383
}
8484

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@
2222
@InternalApi
2323
public abstract class SearchRequestOperationsListener {
2424
private volatile boolean enabled;
25+
public static final SearchRequestOperationsListener NOOP = new SearchRequestOperationsListener(false) {
26+
@Override
27+
protected void onPhaseStart(SearchPhaseContext context) {}
28+
29+
@Override
30+
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
31+
32+
@Override
33+
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}
34+
};
2535

2636
protected SearchRequestOperationsListener() {
2737
this.enabled = true;
@@ -35,7 +45,7 @@ protected SearchRequestOperationsListener(final boolean enabled) {
3545

3646
protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
3747

38-
protected abstract void onPhaseFailure(SearchPhaseContext context);
48+
protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);
3949

4050
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
4151

@@ -91,10 +101,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
91101
}
92102

93103
@Override
94-
protected void onPhaseFailure(SearchPhaseContext context) {
104+
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
95105
for (SearchRequestOperationsListener listener : listeners) {
96106
try {
97-
listener.onPhaseFailure(context);
107+
listener.onPhaseFailure(context, cause);
98108
} catch (Exception e) {
99109
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
100110
}

server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ protected void onPhaseStart(SearchPhaseContext context) {}
140140
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
141141

142142
@Override
143-
protected void onPhaseFailure(SearchPhaseContext context) {}
143+
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}
144144

145145
@Override
146146
protected void onRequestStart(SearchRequestContext searchRequestContext) {}

server/src/main/java/org/opensearch/action/search/SearchRequestStats.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
7171
}
7272

7373
@Override
74-
protected void onPhaseFailure(SearchPhaseContext context) {
74+
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
7575
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
7676
}
7777

0 commit comments

Comments
 (0)