Skip to content

Commit 8e32c03

Browse files
committed
Avoid invalid retries on multiple replicas when querying
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 6b45972 commit 8e32c03

File tree

6 files changed

+109
-13
lines changed

6 files changed

+109
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3232
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
3333
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
3434
- TieredSpilloverCache took-time threshold now guards heap tier as well as disk tier [#17190](https://github.com/opensearch-project/OpenSearch/pull/17190)
35+
- Avoid invalid retries in multiple replicas when querying [#17370](https://github.com/opensearch-project/OpenSearch/pull/17370)
3536

3637
### Deprecated
3738

libs/core/src/main/java/org/opensearch/OpenSearchException.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,12 @@ protected Map<String, List<String>> getHeaders() {
296296
* Returns the rest status code associated with this exception.
297297
*/
298298
public RestStatus status() {
299-
Throwable cause = unwrapCause();
300-
if (cause == this) {
299+
return status(this);
300+
}
301+
302+
public static RestStatus status(Throwable t) {
303+
Throwable cause = ExceptionsHelper.unwrapCause(t);
304+
if (cause == t) {
301305
return RestStatus.INTERNAL_SERVER_ERROR;
302306
} else {
303307
return ExceptionsHelper.status(cause);

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

+22-10
Original file line numberDiff line numberDiff line change
@@ -514,10 +514,19 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
514514
// we do make sure to clean it on a successful response from a shard
515515
setPhaseResourceUsages();
516516
onShardFailure(shardIndex, shard, e);
517-
SearchShardTarget nextShard = FailAwareWeightedRouting.getInstance()
518-
.findNext(shardIt, clusterState, e, () -> totalOps.incrementAndGet());
519517

520-
final boolean lastShard = nextShard == null;
518+
final SearchShardTarget nextShard;
519+
final boolean lastShard;
520+
final int advanceShardCount;
521+
if (TransportActions.isRetryableSearchException(e)) {
522+
nextShard = FailAwareWeightedRouting.getInstance().findNext(shardIt, clusterState, e, () -> totalOps.incrementAndGet());
523+
lastShard = nextShard == null;
524+
advanceShardCount = 1;
525+
} else {
526+
nextShard = null;
527+
lastShard = true;
528+
advanceShardCount = remainingOpsCount(shardIt);
529+
}
521530
if (logger.isTraceEnabled()) {
522531
logger.trace(
523532
() -> new ParameterizedMessage(
@@ -542,7 +551,7 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
542551
if (lastShard) {
543552
onShardGroupFailure(shardIndex, shard, e);
544553
}
545-
final int totalOps = this.totalOps.incrementAndGet();
554+
final int totalOps = this.totalOps.addAndGet(advanceShardCount);
546555
if (totalOps == expectedTotalOps) {
547556
try {
548557
onPhaseDone();
@@ -561,6 +570,14 @@ private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget sh
561570
}
562571
}
563572

573+
private int remainingOpsCount(SearchShardIterator shardsIt) {
574+
if (shardsIt.skip()) {
575+
return shardsIt.remaining();
576+
} else {
577+
return shardsIt.remaining() + 1;
578+
}
579+
}
580+
564581
/**
565582
* Executed once for every {@link ShardId} that failed on all available shard routing.
566583
*
@@ -651,12 +668,7 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
651668
}
652669

653670
private void successfulShardExecution(SearchShardIterator shardsIt) {
654-
final int remainingOpsOnIterator;
655-
if (shardsIt.skip()) {
656-
remainingOpsOnIterator = shardsIt.remaining();
657-
} else {
658-
remainingOpsOnIterator = shardsIt.remaining() + 1;
659-
}
671+
final int remainingOpsOnIterator = remainingOpsCount(shardsIt);
660672
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
661673
if (xTotalOps == expectedTotalOps) {
662674
try {

server/src/main/java/org/opensearch/action/support/TransportActions.java

+10
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434

3535
import org.apache.lucene.store.AlreadyClosedException;
3636
import org.opensearch.ExceptionsHelper;
37+
import org.opensearch.OpenSearchException;
3738
import org.opensearch.action.NoShardAvailableActionException;
3839
import org.opensearch.action.UnavailableShardsException;
40+
import org.opensearch.core.tasks.TaskCancelledException;
3941
import org.opensearch.index.IndexNotFoundException;
4042
import org.opensearch.index.shard.IllegalIndexShardStateException;
4143
import org.opensearch.index.shard.ShardNotFoundException;
@@ -64,4 +66,12 @@ public static boolean isReadOverrideException(Exception e) {
6466
return !isShardNotAvailableException(e);
6567
}
6668

69+
public static boolean isRetryableSearchException(final Exception e) {
70+
71+
return (OpenSearchException.status(e).getStatus() / 100 != 4) && (e.getCause() instanceof TaskCancelledException == false)
72+
// There exists a scenario where a primary shard (0 replicas) relocates and is in POST_RECOVERY on the
73+
// target node but already deleted on the source node. Search request should still work.
74+
|| (e.getCause() instanceof IndexNotFoundException);
75+
}
76+
6777
}

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+2
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
661661
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
662662
executedTask = true;
663663
return updateTask.execute(currentState);
664+
} else {
665+
System.out.println("dddddd");
664666
}
665667
return currentState;
666668
}

server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java

+68-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.core.common.breaker.NoopCircuitBreaker;
5050
import org.opensearch.core.index.Index;
5151
import org.opensearch.core.index.shard.ShardId;
52+
import org.opensearch.core.tasks.TaskCancelledException;
5253
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
5354
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
5455
import org.opensearch.index.query.MatchAllQueryBuilder;
@@ -66,6 +67,7 @@
6667
import org.opensearch.threadpool.TestThreadPool;
6768
import org.opensearch.threadpool.ThreadPool;
6869
import org.opensearch.transport.Transport;
70+
import org.opensearch.transport.TransportException;
6971
import org.junit.After;
7072
import org.junit.Before;
7173

@@ -136,6 +138,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
136138
controlled,
137139
false,
138140
false,
141+
false,
139142
expected,
140143
resourceUsage,
141144
new SearchShardIterator(null, null, Collections.emptyList(), null)
@@ -148,6 +151,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
148151
ActionListener<SearchResponse> listener,
149152
final boolean controlled,
150153
final boolean failExecutePhaseOnShard,
154+
final boolean throw4xxExceptionOnShard,
151155
final boolean catchExceptionWhenExecutePhaseOnShard,
152156
final AtomicLong expected,
153157
final TaskResourceUsage resourceUsage,
@@ -217,7 +221,11 @@ protected void executePhaseOnShard(
217221
final SearchActionListener<SearchPhaseResult> listener
218222
) {
219223
if (failExecutePhaseOnShard) {
220-
listener.onFailure(new ShardNotFoundException(shardIt.shardId()));
224+
if (throw4xxExceptionOnShard) {
225+
listener.onFailure(new TransportException(new TaskCancelledException(shardIt.shardId().toString())));
226+
} else {
227+
listener.onFailure(new ShardNotFoundException(shardIt.shardId()));
228+
}
221229
} else {
222230
if (catchExceptionWhenExecutePhaseOnShard) {
223231
try {
@@ -585,6 +593,7 @@ public void onFailure(Exception e) {
585593
false,
586594
true,
587595
false,
596+
false,
588597
new AtomicLong(),
589598
new TaskResourceUsage(randomLong(), randomLong()),
590599
shards
@@ -601,6 +610,62 @@ public void onFailure(Exception e) {
601610
assertThat(searchResponse.getSuccessfulShards(), equalTo(0));
602611
}
603612

613+
public void testSkipInValidRetryInMultiReplicas() throws InterruptedException {
614+
final Index index = new Index("test", UUID.randomUUID().toString());
615+
final CountDownLatch latch = new CountDownLatch(1);
616+
final AtomicBoolean fail = new AtomicBoolean(true);
617+
618+
List<String> targetNodeIds = List.of("n1", "n2", "n3");
619+
final SearchShardIterator[] shards = IntStream.range(2, 4)
620+
.mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), targetNodeIds, null, null, null))
621+
.toArray(SearchShardIterator[]::new);
622+
623+
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
624+
searchRequest.setMaxConcurrentShardRequests(1);
625+
626+
final ArraySearchPhaseResults<SearchPhaseResult> queryResult = new ArraySearchPhaseResults<>(shards.length);
627+
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(
628+
searchRequest,
629+
queryResult,
630+
new ActionListener<SearchResponse>() {
631+
@Override
632+
public void onResponse(SearchResponse response) {
633+
634+
}
635+
636+
@Override
637+
public void onFailure(Exception e) {
638+
if (fail.compareAndExchange(true, false)) {
639+
try {
640+
throw new RuntimeException("Simulated exception");
641+
} finally {
642+
executor.submit(() -> latch.countDown());
643+
}
644+
}
645+
}
646+
},
647+
false,
648+
true,
649+
true,
650+
false,
651+
new AtomicLong(),
652+
new TaskResourceUsage(randomLong(), randomLong()),
653+
shards
654+
);
655+
action.run();
656+
assertTrue(latch.await(1, TimeUnit.SECONDS));
657+
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
658+
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null);
659+
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
660+
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
661+
assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile());
662+
assertSame(searchResponse.getHits(), internalSearchResponse.hits());
663+
assertThat(searchResponse.getSuccessfulShards(), equalTo(0));
664+
for (int i = 0; i < shards.length; i++) {
665+
assertEquals(targetNodeIds.size() - 1, shards[i].remaining());
666+
}
667+
}
668+
604669
public void testOnShardSuccessPhaseDoneFailure() throws InterruptedException {
605670
final Index index = new Index("test", UUID.randomUUID().toString());
606671
final CountDownLatch latch = new CountDownLatch(1);
@@ -633,6 +698,7 @@ public void onFailure(Exception e) {
633698
false,
634699
false,
635700
false,
701+
false,
636702
new AtomicLong(),
637703
new TaskResourceUsage(randomLong(), randomLong()),
638704
shards
@@ -685,6 +751,7 @@ public void onFailure(Exception e) {
685751
},
686752
false,
687753
false,
754+
false,
688755
catchExceptionWhenExecutePhaseOnShard,
689756
new AtomicLong(),
690757
new TaskResourceUsage(randomLong(), randomLong()),

0 commit comments

Comments
 (0)