Skip to content

Commit fb2c5f2

Browse files
authored
[Bug] Check phase name before SearchRequestOperationsListener onPhaseStart (opensearch-project#12035)
Signed-off-by: David Zane <davizane@amazon.com>
1 parent f9ab801 commit fb2c5f2

File tree

5 files changed

+60
-11
lines changed

5 files changed

+60
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8585
- Update supported version for max_shard_size parameter in Shrink API ([#11439](https://github.com/opensearch-project/OpenSearch/pull/11439))
8686
- Fix typo in API annotation check message ([11836](https://github.com/opensearch-project/OpenSearch/pull/11836))
8787
- Update supported version for must_exist parameter in update aliases API ([#11872](https://github.com/opensearch-project/OpenSearch/pull/11872))
88+
- [Bug] Check phase name before SearchRequestOperationsListener onPhaseStart ([#12035](https://github.com/opensearch-project/OpenSearch/pull/12035))
8889

8990
### Security
9091

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -432,16 +432,18 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
432432
}
433433

434434
private void onPhaseEnd(SearchRequestContext searchRequestContext) {
435-
if (getCurrentPhase() != null) {
435+
if (getCurrentPhase() != null && SearchPhaseName.isValidName(getName())) {
436436
long tookInNanos = System.nanoTime() - getCurrentPhase().getStartTimeInNanos();
437437
searchRequestContext.updatePhaseTookMap(getCurrentPhase().getName(), TimeUnit.NANOSECONDS.toMillis(tookInNanos));
438+
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
438439
}
439-
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext);
440440
}
441441

442-
private void onPhaseStart(SearchPhase phase) {
442+
void onPhaseStart(SearchPhase phase) {
443443
setCurrentPhase(phase);
444-
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
444+
if (SearchPhaseName.isValidName(phase.getName())) {
445+
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
446+
}
445447
}
446448

447449
private void onRequestEnd(SearchRequestContext searchRequestContext) {
@@ -714,7 +716,9 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
714716

715717
@Override
716718
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
717-
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
719+
if (SearchPhaseName.isValidName(phase.getName())) {
720+
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
721+
}
718722
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
719723
}
720724

server/src/main/java/org/opensearch/action/search/SearchPhaseName.java

+13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010

1111
import org.opensearch.common.annotation.PublicApi;
1212

13+
import java.util.HashSet;
14+
import java.util.Set;
15+
1316
/**
1417
* Enum for different Search Phases in OpenSearch
1518
*
@@ -25,6 +28,12 @@ public enum SearchPhaseName {
2528
CAN_MATCH("can_match");
2629

2730
private final String name;
31+
private static final Set<String> PHASE_NAMES = new HashSet<>();
32+
static {
33+
for (SearchPhaseName phaseName : SearchPhaseName.values()) {
34+
PHASE_NAMES.add(phaseName.name);
35+
}
36+
}
2837

2938
SearchPhaseName(final String name) {
3039
this.name = name;
@@ -33,4 +42,8 @@ public enum SearchPhaseName {
3342
public String getName() {
3443
return name;
3544
}
45+
46+
public static boolean isValidName(String phaseName) {
47+
return PHASE_NAMES.contains(phaseName);
48+
}
3649
}

server/src/main/java/org/opensearch/action/search/TransportSearchAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1238,7 +1238,7 @@ private AbstractSearchAsyncAction<? extends SearchPhaseResult> searchAsyncAction
12381238
clusters,
12391239
searchRequestContext
12401240
);
1241-
return new SearchPhase(action.getName()) {
1241+
return new SearchPhase("none") {
12421242
@Override
12431243
public void run() {
12441244
action.start();

server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java

+36-5
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,14 @@ public void testOnPhaseFailureAndVerifyListeners() {
338338
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
339339
action.start();
340340
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
341-
action.onPhaseFailure(new SearchPhase("test") {
341+
action.onPhaseFailure(new SearchPhase("none") {
342+
@Override
343+
public void run() {
344+
345+
}
346+
}, "message", null);
347+
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
348+
action.onPhaseFailure(new SearchPhase(action.getName()) {
342349
@Override
343350
public void run() {
344351

@@ -352,14 +359,14 @@ public void run() {
352359
);
353360
searchDfsQueryThenFetchAsyncAction.start();
354361
assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
355-
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") {
362+
searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase(searchDfsQueryThenFetchAsyncAction.getName()) {
356363
@Override
357364
public void run() {
358365

359366
}
360367
}, "message", null);
361-
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
362-
assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName()));
368+
assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
369+
assertEquals(0, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()));
363370

364371
FetchSearchPhase fetchPhase = createFetchSearchPhase();
365372
ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt());
@@ -368,7 +375,7 @@ public void run() {
368375
action.skipShard(searchShardIterator);
369376
action.executeNextPhase(action, fetchPhase);
370377
assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName()));
371-
action.onPhaseFailure(new SearchPhase("test") {
378+
action.onPhaseFailure(new SearchPhase(fetchPhase.getName()) {
372379
@Override
373380
public void run() {
374381

@@ -403,6 +410,30 @@ public void run() {
403410
assertEquals(requestIds, releasedContexts);
404411
}
405412

413+
public void testOnPhaseStart() {
414+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
415+
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
416+
417+
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
418+
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
419+
420+
action.onPhaseStart(new SearchPhase("test") {
421+
@Override
422+
public void run() {}
423+
});
424+
action.onPhaseStart(new SearchPhase("none") {
425+
@Override
426+
public void run() {}
427+
});
428+
assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName()));
429+
430+
action.onPhaseStart(new SearchPhase(action.getName()) {
431+
@Override
432+
public void run() {}
433+
});
434+
assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName()));
435+
}
436+
406437
public void testShardNotAvailableWithDisallowPartialFailures() {
407438
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
408439
AtomicReference<Exception> exception = new AtomicReference<>();

0 commit comments

Comments
 (0)