Skip to content

Commit 94fad2c

Browse files
authored
Remote reindex: Add support for configurable retry mechanism (#12634)
Signed-off-by: Ankit Kala <ankikala@amazon.com>
1 parent 1b64fca commit 94fad2c

File tree

7 files changed

+171
-12
lines changed

7 files changed

+171
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
1818
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
1919
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
20+
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
2021
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))
2122

2223
### Dependencies

modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexPlugin.java

+2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ public List<Setting<?>> getSettings() {
133133
final List<Setting<?>> settings = new ArrayList<>();
134134
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
135135
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
136+
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
137+
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
136138
settings.addAll(ReindexSslConfig.getSettings());
137139
return settings;
138140
}

modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.opensearch.client.RestClient;
5353
import org.opensearch.client.RestClientBuilder;
5454
import org.opensearch.cluster.service.ClusterService;
55+
import org.opensearch.common.logging.Loggers;
5556
import org.opensearch.common.lucene.uid.Versions;
5657
import org.opensearch.core.action.ActionListener;
5758
import org.opensearch.core.common.Strings;
@@ -139,7 +140,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
139140
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
140141
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
141142
task,
142-
logger,
143+
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
144+
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
143145
assigningClient,
144146
threadPool,
145147
scriptService,

modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java

+28
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.common.settings.Setting;
4343
import org.opensearch.common.settings.Setting.Property;
4444
import org.opensearch.common.settings.Settings;
45+
import org.opensearch.common.unit.TimeValue;
4546
import org.opensearch.core.action.ActionListener;
4647
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
4748
import org.opensearch.script.ScriptService;
@@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
7172
Function.identity(),
7273
Property.NodeScope
7374
);
75+
76+
public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
77+
"reindex.remote.retry.initial_backoff",
78+
TimeValue.timeValueMillis(500),
79+
TimeValue.timeValueMillis(50),
80+
TimeValue.timeValueMillis(5000),
81+
Property.Dynamic,
82+
Property.NodeScope
83+
);
84+
85+
public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
86+
"reindex.remote.retry.max_count",
87+
15,
88+
1,
89+
100,
90+
Property.Dynamic,
91+
Property.NodeScope
92+
);
93+
7494
public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();
7595

7696
private final ReindexValidator reindexValidator;
7797
private final Reindexer reindexer;
7898

99+
private final ClusterService clusterService;
100+
79101
@Inject
80102
public TransportReindexAction(
81103
Settings settings,
@@ -92,10 +114,16 @@ public TransportReindexAction(
92114
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
93115
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
94116
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
117+
this.clusterService = clusterService;
95118
}
96119

97120
@Override
98121
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
122+
if (request.getRemoteInfo() != null) {
123+
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
124+
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
125+
}
126+
99127
reindexValidator.initialValidation(request);
100128
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
101129
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java

+40-7
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,14 @@
6060
import org.opensearch.core.xcontent.XContentParseException;
6161
import org.opensearch.core.xcontent.XContentParser;
6262
import org.opensearch.index.reindex.RejectAwareActionListener;
63+
import org.opensearch.index.reindex.RetryListener;
6364
import org.opensearch.index.reindex.ScrollableHitSource;
6465
import org.opensearch.threadpool.ThreadPool;
6566

6667
import java.io.IOException;
6768
import java.io.InputStream;
69+
import java.net.ConnectException;
70+
import java.util.Arrays;
6871
import java.util.function.BiFunction;
6972
import java.util.function.Consumer;
7073

@@ -98,21 +101,29 @@ public RemoteScrollableHitSource(
98101

99102
@Override
100103
protected void doStart(RejectAwareActionListener<Response> searchListener) {
101-
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
104+
logger.info("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
105+
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
102106
remoteVersion = version;
103-
execute(
107+
logger.trace("Starting initial search");
108+
executeWithRetries(
104109
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
105110
RESPONSE_PARSER,
106111
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
107112
);
108-
}));
113+
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll)
114+
// level.
115+
}, searchListener::onFailure, searchListener::onFailure));
109116
}
110117

111118
void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
119+
logger.trace("Checking version for remote domain");
120+
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
121+
// instead of retrying for longer duration.
112122
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
113123
}
114124

115125
private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
126+
logger.trace("On initial search response");
116127
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
117128
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
118129
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
@@ -123,12 +134,14 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,
123134

124135
@Override
125136
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
137+
logger.trace("Starting next scroll call");
126138
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
127-
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
139+
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
128140
}
129141

130142
@Override
131143
protected void clearScroll(String scrollId, Runnable onCompletion) {
144+
logger.debug("Clearing the scrollID {}", scrollId);
132145
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
133146
@Override
134147
public void onSuccess(org.opensearch.client.Response response) {
@@ -179,17 +192,31 @@ protected void cleanup(Runnable onCompletion) {
179192
});
180193
}
181194

195+
private void executeWithRetries(
196+
Request request,
197+
BiFunction<XContentParser, MediaType, Response> parser,
198+
RejectAwareActionListener<Response> childListener
199+
) {
200+
execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> {
201+
logger.debug("Retrying execute request {}", request.getEndpoint());
202+
countSearchRetry.run();
203+
execute(request, parser, r);
204+
}, childListener));
205+
}
206+
182207
private <T> void execute(
183208
Request request,
184209
BiFunction<XContentParser, MediaType, T> parser,
185210
RejectAwareActionListener<? super T> listener
186211
) {
212+
logger.trace("Executing http request to remote cluster {}", request.getEndpoint());
187213
// Preserve the thread context so headers survive after the call
188214
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
189215
try {
190216
client.performRequestAsync(request, new ResponseListener() {
191217
@Override
192218
public void onSuccess(org.opensearch.client.Response response) {
219+
logger.trace("Successfully got response from the remote");
193220
// Restore the thread context to get the precious headers
194221
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
195222
assert ctx != null; // eliminates compiler warning
@@ -204,7 +231,7 @@ public void onSuccess(org.opensearch.client.Response response) {
204231
}
205232
if (mediaType == null) {
206233
try {
207-
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
234+
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
208235
throw new OpenSearchException(
209236
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
210237
);
@@ -236,22 +263,28 @@ public void onSuccess(org.opensearch.client.Response response) {
236263
public void onFailure(Exception e) {
237264
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
238265
assert ctx != null; // eliminates compiler warning
266+
logger.debug("Received response failure {}", e.getMessage());
239267
if (e instanceof ResponseException) {
240268
ResponseException re = (ResponseException) e;
241269
int statusCode = re.getResponse().getStatusLine().getStatusCode();
242270
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
243-
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
271+
// retry all 5xx & 429s.
272+
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode
273+
|| statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
244274
listener.onRejection(e);
245275
return;
246276
}
277+
} else if (e instanceof ConnectException) {
278+
listener.onRejection(e);
279+
return;
247280
} else if (e instanceof ContentTooLongException) {
248281
e = new IllegalArgumentException(
249282
"Remote responded with a chunk that was too large. Use a smaller batch size.",
250283
e
251284
);
252285
}
253-
listener.onFailure(e);
254286
}
287+
listener.onFailure(e);
255288
}
256289
});
257290
} catch (Exception e) {

modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

+95-2
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,18 @@
4747
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
4848
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
4949
import org.apache.http.message.BasicHttpResponse;
50+
import org.apache.http.message.BasicRequestLine;
5051
import org.apache.http.message.BasicStatusLine;
5152
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
5253
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
54+
import org.apache.http.protocol.HttpContext;
5355
import org.opensearch.LegacyESVersion;
5456
import org.opensearch.OpenSearchStatusException;
5557
import org.opensearch.Version;
5658
import org.opensearch.action.bulk.BackoffPolicy;
5759
import org.opensearch.action.search.SearchRequest;
5860
import org.opensearch.client.HeapBufferedAsyncResponseConsumer;
61+
import org.opensearch.client.ResponseException;
5962
import org.opensearch.client.RestClient;
6063
import org.opensearch.common.io.Streams;
6164
import org.opensearch.common.unit.TimeValue;
@@ -79,17 +82,20 @@
7982

8083
import java.io.IOException;
8184
import java.io.InputStreamReader;
85+
import java.net.ConnectException;
8286
import java.net.URL;
8387
import java.nio.charset.StandardCharsets;
8488
import java.util.Queue;
8589
import java.util.concurrent.ExecutorService;
8690
import java.util.concurrent.Future;
8791
import java.util.concurrent.LinkedBlockingQueue;
8892
import java.util.concurrent.atomic.AtomicBoolean;
93+
import java.util.concurrent.atomic.AtomicInteger;
8994
import java.util.concurrent.atomic.AtomicReference;
9095
import java.util.function.Consumer;
9196
import java.util.stream.Stream;
9297

98+
import org.mockito.Mockito;
9399
import org.mockito.invocation.InvocationOnMock;
94100
import org.mockito.stubbing.Answer;
95101

@@ -490,7 +496,7 @@ public void testInvalidJsonThinksRemoteIsNotES() throws IOException {
490496
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("some_text.txt").start());
491497
assertEquals(
492498
"Error parsing the response, remote is likely not an OpenSearch instance",
493-
e.getCause().getCause().getCause().getMessage()
499+
e.getCause().getCause().getCause().getCause().getMessage()
494500
);
495501
}
496502

@@ -499,7 +505,7 @@ public void testUnexpectedJsonThinksRemoteIsNotES() throws IOException {
499505
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("main/2_3_3.json").start());
500506
assertEquals(
501507
"Error parsing the response, remote is likely not an OpenSearch instance",
502-
e.getCause().getCause().getCause().getMessage()
508+
e.getCause().getCause().getCause().getCause().getMessage()
503509
);
504510
}
505511

@@ -650,4 +656,91 @@ private <T extends Exception, V> T expectListenerFailure(Class<T> expectedExcept
650656
assertNotNull(exception.get());
651657
return exception.get();
652658
}
659+
660+
RemoteScrollableHitSource createRemoteSourceWithFailure(
661+
boolean shouldMockRemoteVersion,
662+
Exception failure,
663+
AtomicInteger invocationCount
664+
) {
665+
CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() {
666+
@Override
667+
public <T> Future<T> execute(
668+
HttpAsyncRequestProducer requestProducer,
669+
HttpAsyncResponseConsumer<T> responseConsumer,
670+
HttpContext context,
671+
FutureCallback<T> callback
672+
) {
673+
invocationCount.getAndIncrement();
674+
callback.failed(failure);
675+
return null;
676+
}
677+
678+
@Override
679+
public void close() throws IOException {}
680+
681+
@Override
682+
public boolean isRunning() {
683+
return false;
684+
}
685+
686+
@Override
687+
public void start() {}
688+
};
689+
return sourceWithMockedClient(shouldMockRemoteVersion, httpClient);
690+
}
691+
692+
void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) {
693+
retriesAllowed = 5;
694+
AtomicInteger invocations = new AtomicInteger();
695+
invocations.set(0);
696+
RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations);
697+
698+
Throwable e = expectThrows(RuntimeException.class, source::start);
699+
int expectedInvocations = 0;
700+
if (shouldMockRemoteVersion) {
701+
expectedInvocations += 1; // first search
702+
if (expectedToRetry) expectedInvocations += retriesAllowed;
703+
} else {
704+
expectedInvocations = 1; // the first should fail and not trigger any retry.
705+
}
706+
707+
assertEquals(expectedInvocations, invocations.get());
708+
709+
// Unwrap the some artifacts from the test
710+
while (e.getMessage().equals("failed")) {
711+
e = e.getCause();
712+
}
713+
// There is an additional wrapper for ResponseException.
714+
if (failureResponse instanceof ResponseException) {
715+
e = e.getCause();
716+
}
717+
718+
assertSame(failureResponse, e);
719+
}
720+
721+
ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException {
722+
org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class);
723+
ProtocolVersion protocolVersion = new ProtocolVersion("https", 1, 1);
724+
Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN));
725+
Mockito.when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(protocolVersion, statusCode, errorMsg));
726+
Mockito.when(mockResponse.getRequestLine()).thenReturn(new BasicRequestLine("GET", "/", protocolVersion));
727+
return new ResponseException(mockResponse);
728+
}
729+
730+
public void testRetryOnCallFailure() throws Exception {
731+
// First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s.
732+
verifyRetries(true, withResponseCode(500, "Internal Server Error"), true);
733+
verifyRetries(true, withResponseCode(429, "Too many requests"), true);
734+
verifyRetries(true, withResponseCode(400, "Client Error"), false);
735+
736+
// First call succeeds. Search call failed with exceptions other than ResponseException
737+
verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions.
738+
verifyRetries(true, new RuntimeException("foobar"), false);
739+
740+
// First call(remote version lookup) failed and no retries expected
741+
verifyRetries(false, withResponseCode(500, "Internal Server Error"), false);
742+
verifyRetries(false, withResponseCode(429, "Too many requests"), false);
743+
verifyRetries(false, withResponseCode(400, "Client Error"), false);
744+
verifyRetries(false, new ConnectException("blah"), false);
745+
}
653746
}

server/src/main/java/org/opensearch/index/reindex/RetryListener.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@
4747
*
4848
* @opensearch.internal
4949
*/
50-
class RetryListener implements RejectAwareActionListener<ScrollableHitSource.Response> {
50+
public class RetryListener implements RejectAwareActionListener<ScrollableHitSource.Response> {
5151
private final Logger logger;
5252
private final Iterator<TimeValue> retries;
5353
private final ThreadPool threadPool;
5454
private final Consumer<RejectAwareActionListener<ScrollableHitSource.Response>> retryScrollHandler;
5555
private final ActionListener<ScrollableHitSource.Response> delegate;
5656
private int retryCount = 0;
5757

58-
RetryListener(
58+
public RetryListener(
5959
Logger logger,
6060
ThreadPool threadPool,
6161
BackoffPolicy backoffPolicy,

0 commit comments

Comments
 (0)