Skip to content

Commit ae49fd2

Browse files
authored
Handle OpenSearchRejectedExecutionException while retrying refresh (opensearch-project#13301)
Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 41a3055 commit ae49fd2

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java

+7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.lease.Releasable;
1414
import org.opensearch.common.lease.Releasables;
1515
import org.opensearch.common.unit.TimeValue;
16+
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
1617
import org.opensearch.threadpool.ThreadPool;
1718

1819
import java.io.IOException;
@@ -129,6 +130,12 @@ private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boole
129130
);
130131
scheduled = true;
131132
getLogger().info("Scheduled retry with didRefresh={}", didRefresh);
133+
} catch (OpenSearchRejectedExecutionException e) {
134+
if (e.isExecutorShutdown()) {
135+
getLogger().info("Scheduling retry with didRefresh={} failed due to executor shut down", didRefresh);
136+
} else {
137+
throw e;
138+
}
132139
} finally {
133140
if (scheduled == false) {
134141
retryScheduled.set(false);

server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java

+39-3
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,32 @@ protected Logger getLogger() {
316316
public void testScheduleRetryAfterClose() throws Exception {
317317
// This tests that once the listener has been closed, even the retries would not be scheduled.
318318
final AtomicLong runCount = new AtomicLong();
319-
ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) {
319+
ReleasableRetryableRefreshListener testRefreshListener = getRetryableRefreshListener(runCount);
320+
Thread thread1 = new Thread(() -> {
321+
try {
322+
testRefreshListener.afterRefresh(true);
323+
} catch (IOException e) {
324+
throw new AssertionError(e);
325+
}
326+
});
327+
Thread thread2 = new Thread(() -> {
328+
try {
329+
Thread.sleep(500);
330+
testRefreshListener.drainRefreshes();
331+
} catch (InterruptedException e) {
332+
throw new AssertionError(e);
333+
}
334+
});
335+
thread1.start();
336+
thread2.start();
337+
thread1.join();
338+
thread2.join();
339+
assertBusy(() -> assertEquals(1, runCount.get()));
340+
assertRefreshListenerClosed(testRefreshListener);
341+
}
342+
343+
private ReleasableRetryableRefreshListener getRetryableRefreshListener(AtomicLong runCount) {
344+
return new ReleasableRetryableRefreshListener(threadPool) {
320345
@Override
321346
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
322347
try {
@@ -341,6 +366,11 @@ protected String getRetryThreadPoolName() {
341366
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
342367
}
343368

369+
@Override
370+
protected boolean isRetryEnabled() {
371+
return true;
372+
}
373+
344374
@Override
345375
protected TimeValue getNextRetryInterval() {
346376
try {
@@ -351,6 +381,12 @@ protected TimeValue getNextRetryInterval() {
351381
return TimeValue.timeValueMillis(100);
352382
}
353383
};
384+
}
385+
386+
public void testScheduleRetryAfterThreadpoolShutdown() throws Exception {
387+
// This tests that once the thread-pool is shut down, the exception is handled.
388+
final AtomicLong runCount = new AtomicLong();
389+
ReleasableRetryableRefreshListener testRefreshListener = getRetryableRefreshListener(runCount);
354390
Thread thread1 = new Thread(() -> {
355391
try {
356392
testRefreshListener.afterRefresh(true);
@@ -361,7 +397,7 @@ protected TimeValue getNextRetryInterval() {
361397
Thread thread2 = new Thread(() -> {
362398
try {
363399
Thread.sleep(500);
364-
testRefreshListener.drainRefreshes();
400+
threadPool.shutdown();
365401
} catch (InterruptedException e) {
366402
throw new AssertionError(e);
367403
}
@@ -371,7 +407,7 @@ protected TimeValue getNextRetryInterval() {
371407
thread1.join();
372408
thread2.join();
373409
assertBusy(() -> assertEquals(1, runCount.get()));
374-
assertRefreshListenerClosed(testRefreshListener);
410+
assertFalse(testRefreshListener.getRetryScheduledStatus());
375411
}
376412

377413
public void testConcurrentScheduleRetry() throws Exception {

0 commit comments

Comments
 (0)