Skip to content

Commit 18bcbb1

Browse files
committed
Tracing for deep search path
Signed-off-by: David Zane <davizane@amazon.com>
1 parent 60b2ac4 commit 18bcbb1

21 files changed

+392
-90
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
1717
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
1818
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
19+
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))
1920

2021
### Dependencies
2122
- Bump `log4j-core` from 2.18.0 to 2.19.0

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* @opensearch.internal
2525
*/
2626
@InternalApi
27-
class DefaultTracer implements Tracer {
27+
public class DefaultTracer implements Tracer {
2828
/**
2929
* Current thread name.
3030
*/

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.search.internal.SearchContext;
5959
import org.opensearch.search.internal.ShardSearchRequest;
6060
import org.opensearch.search.pipeline.PipelinedRequest;
61+
import org.opensearch.telemetry.tracing.SpanScope;
6162
import org.opensearch.transport.Transport;
6263

6364
import java.util.ArrayDeque;
@@ -220,6 +221,7 @@ public final void start() {
220221
null
221222
)
222223
);
224+
searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext);
223225
return;
224226
}
225227
executePhase(this);
@@ -439,21 +441,23 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) {
439441
}
440442
}
441443

442-
void onPhaseStart(SearchPhase phase) {
444+
void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) {
443445
setCurrentPhase(phase);
444446
if (SearchPhaseName.isValidName(phase.getName())) {
445-
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
447+
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext);
446448
}
447449
}
448450

449451
private void onRequestEnd(SearchRequestContext searchRequestContext) {
450-
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
452+
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext);
451453
}
452454

453455
private void executePhase(SearchPhase phase) {
454456
try {
455-
onPhaseStart(phase);
456-
phase.recordAndRun();
457+
onPhaseStart(phase, searchRequestContext);
458+
try (SpanScope spanScope = searchRequestContext.getTracer().withSpanInScope(searchRequestContext.getPhaseSpan())) {
459+
phase.recordAndRun();
460+
}
457461
} catch (Exception e) {
458462
if (logger.isDebugEnabled()) {
459463
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
@@ -705,6 +709,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
705709
searchContextId = null;
706710
}
707711
}
712+
searchRequestContext.setSearchTask(getTask());
708713
searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits());
709714
searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length);
710715
onPhaseEnd(searchRequestContext);
@@ -717,7 +722,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
717722
@Override
718723
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
719724
if (SearchPhaseName.isValidName(phase.getName())) {
720-
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
725+
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext);
721726
}
722727
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
723728
}

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
import org.apache.lucene.search.TotalHits;
1212
import org.opensearch.common.annotation.InternalApi;
13+
import org.opensearch.telemetry.tracing.Span;
14+
import org.opensearch.telemetry.tracing.Tracer;
15+
import org.opensearch.telemetry.tracing.noop.NoopSpan;
16+
import org.opensearch.telemetry.tracing.noop.NoopTracer;
1317

1418
import java.util.EnumMap;
1519
import java.util.HashMap;
@@ -23,20 +27,45 @@
2327
*/
2428
@InternalApi
2529
class SearchRequestContext {
30+
private final SearchRequest searchRequest;
31+
private SearchTask searchTask;
2632
private final SearchRequestOperationsListener searchRequestOperationsListener;
2733
private long absoluteStartNanos;
2834
private final Map<String, Long> phaseTookMap;
2935
private TotalHits totalHits;
3036
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;
37+
private final Tracer tracer;
38+
private Span requestSpan;
39+
private Span phaseSpan;
3140

32-
private final SearchRequest searchRequest;
41+
/**
42+
* This constructor is for testing only
43+
*/
44+
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest) {
45+
this(searchRequestOperationsListener, searchRequest, NoopTracer.INSTANCE);
46+
}
3347

34-
SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
48+
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest, Tracer tracer) {
3549
this.searchRequestOperationsListener = searchRequestOperationsListener;
50+
this.searchRequest = searchRequest;
51+
this.tracer = tracer;
3652
this.absoluteStartNanos = System.nanoTime();
3753
this.phaseTookMap = new HashMap<>();
3854
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
39-
this.searchRequest = searchRequest;
55+
this.requestSpan = NoopSpan.INSTANCE;
56+
this.phaseSpan = NoopSpan.INSTANCE;
57+
}
58+
59+
SearchRequest getSearchRequest() {
60+
return searchRequest;
61+
}
62+
63+
void setSearchTask(SearchTask searchTask) {
64+
this.searchTask = searchTask;
65+
}
66+
67+
SearchTask getSearchTask() {
68+
return searchTask;
4069
}
4170

4271
SearchRequestOperationsListener getSearchRequestOperationsListener() {
@@ -107,6 +136,26 @@ String formattedShardStats() {
107136
);
108137
}
109138
}
139+
140+
Tracer getTracer() {
141+
return tracer;
142+
}
143+
144+
void setRequestSpan(Span requestSpan) {
145+
this.requestSpan = requestSpan;
146+
}
147+
148+
Span getRequestSpan() {
149+
return requestSpan;
150+
}
151+
152+
void setPhaseSpan(Span phaseSpan) {
153+
this.phaseSpan = phaseSpan;
154+
}
155+
156+
Span getPhaseSpan() {
157+
return phaseSpan;
158+
}
110159
}
111160

112161
enum ShardStatsFieldNames {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.search;
10+
11+
import org.opensearch.telemetry.tracing.AttributeNames;
12+
import org.opensearch.telemetry.tracing.Span;
13+
import org.opensearch.telemetry.tracing.SpanBuilder;
14+
import org.opensearch.telemetry.tracing.SpanContext;
15+
import org.opensearch.telemetry.tracing.Tracer;
16+
17+
import static org.opensearch.core.common.Strings.capitalize;
18+
19+
/**
20+
* Listener for search request tracing on the coordinator node
21+
*
22+
* @opensearch.internal
23+
*/
24+
public final class SearchRequestCoordinatorTrace extends SearchRequestOperationsListener {
25+
private final Tracer tracer;
26+
27+
public SearchRequestCoordinatorTrace(Tracer tracer) {
28+
this.tracer = tracer;
29+
}
30+
31+
@Override
32+
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
33+
searchRequestContext.setPhaseSpan(
34+
tracer.startSpan(
35+
SpanBuilder.from(
36+
"coord" + capitalize(context.getCurrentPhase().getName()),
37+
new SpanContext(searchRequestContext.getRequestSpan())
38+
)
39+
)
40+
);
41+
}
42+
43+
@Override
44+
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
45+
searchRequestContext.getPhaseSpan().endSpan();
46+
}
47+
48+
@Override
49+
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
50+
searchRequestContext.getPhaseSpan().endSpan();
51+
}
52+
53+
@Override
54+
void onRequestEnd(SearchRequestContext searchRequestContext) {
55+
Span requestSpan = searchRequestContext.getRequestSpan();
56+
57+
// add response-related attributes on request end
58+
requestSpan.addAttribute(
59+
AttributeNames.TOTAL_HITS,
60+
searchRequestContext.totalHits() == null ? "0" : searchRequestContext.totalHits().toString()
61+
);
62+
requestSpan.addAttribute(
63+
AttributeNames.SHARDS,
64+
searchRequestContext.formattedShardStats().isEmpty() ? "null" : searchRequestContext.formattedShardStats()
65+
);
66+
requestSpan.addAttribute(
67+
AttributeNames.SOURCE,
68+
searchRequestContext.getSearchRequest().source() == null ? "null" : searchRequestContext.getSearchRequest().source().toString()
69+
);
70+
}
71+
}

server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ protected SearchRequestOperationsListener(final boolean enabled) {
3131
this.enabled = enabled;
3232
}
3333

34-
abstract void onPhaseStart(SearchPhaseContext context);
34+
abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext);
3535

3636
abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);
3737

38-
abstract void onPhaseFailure(SearchPhaseContext context);
38+
abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext);
3939

4040
void onRequestStart(SearchRequestContext searchRequestContext) {}
4141

42-
void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
42+
void onRequestEnd(SearchRequestContext searchRequestContext) {}
4343

4444
boolean isEnabled(SearchRequest searchRequest) {
4545
return isEnabled();
@@ -69,10 +69,10 @@ static final class CompositeListener extends SearchRequestOperationsListener {
6969
}
7070

7171
@Override
72-
void onPhaseStart(SearchPhaseContext context) {
72+
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
7373
for (SearchRequestOperationsListener listener : listeners) {
7474
try {
75-
listener.onPhaseStart(context);
75+
listener.onPhaseStart(context, searchRequestContext);
7676
} catch (Exception e) {
7777
logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e);
7878
}
@@ -91,10 +91,10 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo
9191
}
9292

9393
@Override
94-
void onPhaseFailure(SearchPhaseContext context) {
94+
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
9595
for (SearchRequestOperationsListener listener : listeners) {
9696
try {
97-
listener.onPhaseFailure(context);
97+
listener.onPhaseFailure(context, searchRequestContext);
9898
} catch (Exception e) {
9999
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
100100
}
@@ -113,10 +113,10 @@ void onRequestStart(SearchRequestContext searchRequestContext) {
113113
}
114114

115115
@Override
116-
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
116+
public void onRequestEnd(SearchRequestContext searchRequestContext) {
117117
for (SearchRequestOperationsListener listener : listeners) {
118118
try {
119-
listener.onRequestEnd(context, searchRequestContext);
119+
listener.onRequestEnd(searchRequestContext);
120120
} catch (Exception e) {
121121
logger.warn(() -> new ParameterizedMessage("onRequestEnd listener [{}] failed", listener), e);
122122
}

0 commit comments

Comments
 (0)