Skip to content

Commit 47f9584

Browse files
kkewweirayshrey
authored andcommitted
onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed (opensearch-project#12158)
* onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed Signed-off-by: kkewwei <kkewwei@163.com> * unit test Signed-off-by: kkewwei <kkewwei@163.com> * spotlessJavaCheck Signed-off-by: kkewwei <kkewwei@163.com> * rename variable names Signed-off-by: kkewwei <kkewwei@163.com> * add changelog Signed-off-by: kkewwei <kkewwei@163.com> --------- Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 7502a9c commit 47f9584

File tree

3 files changed

+31
-3
lines changed

3 files changed

+31
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
142142
- Warn about deprecated and ignored index.mapper.dynamic index setting ([#11193](https://github.com/opensearch-project/OpenSearch/pull/11193))
143143
- Fix `terms` query on `float` field when `doc_values` are turned off by reverting back to `FloatPoint` from `FloatField` ([#12499](https://github.com/opensearch-project/OpenSearch/pull/12499))
144144
- Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531))
145+
- onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed ([#12158](https://github.com/opensearch-project/OpenSearch/pull/12158))
145146

146147
### Security
147148

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
286286
Runnable r = () -> {
287287
final Thread thread = Thread.currentThread();
288288
try {
289+
final SearchPhase phase = this;
289290
executePhaseOnShard(shardIt, shard, new SearchActionListener<Result>(shard, shardIndex) {
290291
@Override
291292
public void innerOnResponse(Result result) {
@@ -299,7 +300,12 @@ public void innerOnResponse(Result result) {
299300
@Override
300301
public void onFailure(Exception t) {
301302
try {
302-
onShardFailure(shardIndex, shard, shardIt, t);
303+
// It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception.
304+
if (totalOps.get() == expectedTotalOps) {
305+
onPhaseFailure(phase, "The phase has failed", t);
306+
} else {
307+
onShardFailure(shardIndex, shard, shardIt, t);
308+
}
303309
} finally {
304310
executeNext(pendingExecutions, thread);
305311
}

server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
151151
listener,
152152
controlled,
153153
false,
154+
false,
154155
expected,
155156
new SearchShardIterator(null, null, Collections.emptyList(), null)
156157
);
@@ -162,6 +163,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
162163
ActionListener<SearchResponse> listener,
163164
final boolean controlled,
164165
final boolean failExecutePhaseOnShard,
166+
final boolean catchExceptionWhenExecutePhaseOnShard,
165167
final AtomicLong expected,
166168
final SearchShardIterator... shards
167169
) {
@@ -221,7 +223,15 @@ protected void executePhaseOnShard(
221223
if (failExecutePhaseOnShard) {
222224
listener.onFailure(new ShardNotFoundException(shardIt.shardId()));
223225
} else {
224-
listener.onResponse(new QuerySearchResult());
226+
if (catchExceptionWhenExecutePhaseOnShard) {
227+
try {
228+
listener.onResponse(new QuerySearchResult());
229+
} catch (Exception e) {
230+
listener.onFailure(e);
231+
}
232+
} else {
233+
listener.onResponse(new QuerySearchResult());
234+
}
225235
}
226236
}
227237

@@ -509,6 +519,7 @@ public void onFailure(Exception e) {
509519
},
510520
false,
511521
true,
522+
false,
512523
new AtomicLong(),
513524
shards
514525
);
@@ -555,6 +566,7 @@ public void onFailure(Exception e) {
555566
},
556567
false,
557568
false,
569+
false,
558570
new AtomicLong(),
559571
shards
560572
);
@@ -570,7 +582,7 @@ public void onFailure(Exception e) {
570582
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
571583
}
572584

573-
public void testExecutePhaseOnShardFailure() throws InterruptedException {
585+
private void innerTestExecutePhaseOnShardFailure(boolean catchExceptionWhenExecutePhaseOnShard) throws InterruptedException {
574586
final Index index = new Index("test", UUID.randomUUID().toString());
575587

576588
final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3))
@@ -606,6 +618,7 @@ public void onFailure(Exception e) {
606618
},
607619
false,
608620
false,
621+
catchExceptionWhenExecutePhaseOnShard,
609622
new AtomicLong(),
610623
shards
611624
);
@@ -621,6 +634,14 @@ public void onFailure(Exception e) {
621634
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
622635
}
623636

637+
public void testExecutePhaseOnShardFailure() throws InterruptedException {
638+
innerTestExecutePhaseOnShardFailure(false);
639+
}
640+
641+
public void testExecutePhaseOnShardFailureAndThrowException() throws InterruptedException {
642+
innerTestExecutePhaseOnShardFailure(true);
643+
}
644+
624645
public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException {
625646
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
626647
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);

0 commit comments

Comments
 (0)