Skip to content

Commit 60d1d67

Browse files
committed
CCR should auto-retry rejected execution exceptions (#49213)
If CCR encounters a rejected execution exception, today we treat this as fatal. This is not though, as the stuffed queue could drain. Requiring an administrator to manually restart the follow tasks that faced such an exception is a burden. This commit addresses this by making CCR auto-retry on rejected execution exceptions.
1 parent 09a9ec4 commit 60d1d67

File tree

3 files changed

+27
-9
lines changed

3 files changed

+27
-9
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.collect.Tuple;
2222
import org.elasticsearch.common.transport.NetworkExceptionHelper;
2323
import org.elasticsearch.common.unit.TimeValue;
24+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2425
import org.elasticsearch.index.seqno.SequenceNumbers;
2526
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
2627
import org.elasticsearch.index.shard.ShardId;
@@ -499,7 +500,7 @@ private void updateAliases(final LongConsumer handler, final AtomicInteger retry
499500

500501
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
501502
assert e != null;
502-
if (shouldRetry(params.getRemoteCluster(), e)) {
503+
if (shouldRetry(e)) {
503504
if (isStopped() == false) {
504505
// Only retry is the shard follow task is not stopped.
505506
int currentRetry = retryCounter.incrementAndGet();
@@ -528,7 +529,7 @@ static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
528529
return Math.min(backOffDelay, maxRetryDelayInMillis);
529530
}
530531

531-
static boolean shouldRetry(String remoteCluster, Exception e) {
532+
static boolean shouldRetry(final Exception e) {
532533
if (NetworkExceptionHelper.isConnectException(e)) {
533534
return true;
534535
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
@@ -546,7 +547,8 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
546547
actual instanceof IndexClosedException || // If follow index is closed
547548
actual instanceof ConnectTransportException ||
548549
actual instanceof NodeClosedException ||
549-
actual instanceof NoSuchRemoteClusterException;
550+
actual instanceof NoSuchRemoteClusterException ||
551+
actual instanceof EsRejectedExecutionException;
550552
}
551553

552554
// These methods are protected for testing purposes:

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ protected void nodeOperation(final AllocatedPersistentTask task, final ShardFoll
512512
return;
513513
}
514514

515-
if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) {
515+
if (ShardFollowNodeTask.shouldRetry(e)) {
516516
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
517517
shardFollowNodeTask), e);
518518
threadPool.schedule(() -> nodeOperation(task, params, state), params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.unit.ByteSizeUnit;
1313
import org.elasticsearch.common.unit.ByteSizeValue;
1414
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1516
import org.elasticsearch.index.seqno.SequenceNumbers;
1617
import org.elasticsearch.index.shard.ShardId;
1718
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -257,8 +258,16 @@ public void testReceiveRetryableError() {
257258
startTask(task, 63, -1);
258259

259260
int max = randomIntBetween(1, 30);
261+
final Exception[] exceptions = new Exception[max];
260262
for (int i = 0; i < max; i++) {
261-
readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0)));
263+
final Exception exception;
264+
if (randomBoolean()) {
265+
exception = new ShardNotFoundException(new ShardId("leader_index", "", 0));
266+
} else {
267+
exception = new EsRejectedExecutionException("leader_index rejected");
268+
}
269+
exceptions[i] = exception;
270+
readFailures.add(exception);
262271
}
263272
mappingVersions.add(1L);
264273
leaderGlobalCheckpoints.add(63L);
@@ -274,10 +283,17 @@ public void testReceiveRetryableError() {
274283
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.readExceptions().entrySet().iterator().next();
275284
assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get())));
276285
assertThat(entry.getKey(), equalTo(0L));
277-
assertThat(entry.getValue().v2(), instanceOf(ShardNotFoundException.class));
278-
final ShardNotFoundException shardNotFoundException = (ShardNotFoundException) entry.getValue().v2();
279-
assertThat(shardNotFoundException.getShardId().getIndexName(), equalTo("leader_index"));
280-
assertThat(shardNotFoundException.getShardId().getId(), equalTo(0));
286+
if (exceptions[Math.toIntExact(retryCounter.get()) - 1] instanceof ShardNotFoundException) {
287+
assertThat(entry.getValue().v2(), instanceOf(ShardNotFoundException.class));
288+
final ShardNotFoundException shardNotFoundException = (ShardNotFoundException) entry.getValue().v2();
289+
assertThat(shardNotFoundException.getShardId().getIndexName(), equalTo("leader_index"));
290+
assertThat(shardNotFoundException.getShardId().getId(), equalTo(0));
291+
} else {
292+
assertThat(entry.getValue().v2().getCause(), instanceOf(EsRejectedExecutionException.class));
293+
final EsRejectedExecutionException rejectedExecutionException =
294+
(EsRejectedExecutionException) entry.getValue().v2().getCause();
295+
assertThat(rejectedExecutionException.getMessage(), equalTo("leader_index rejected"));
296+
}
281297
}
282298
retryCounter.incrementAndGet();
283299
};

0 commit comments

Comments
 (0)