Skip to content

Commit b07c8fb

Browse files
authored
Remote reindex: Add support for configurable retry mechanism (opensearch-project#12561)
* Remote reindex: Add support for configurable retry mechanism Signed-off-by: Ankit Kala <ankikala@amazon.com>
1 parent 07fab0f commit b07c8fb

File tree

7 files changed

+187
-12
lines changed

7 files changed

+187
-12
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
1717
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
1818
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
19+
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
1920
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
2021

2122
### Dependencies

modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java

+2
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@ public List<Setting<?>> getSettings() {
132132
final List<Setting<?>> settings = new ArrayList<>();
133133
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
134134
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
135+
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
136+
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
135137
settings.addAll(ReindexSslConfig.getSettings());
136138
return settings;
137139
}

modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.opensearch.client.RestClient;
5555
import org.opensearch.client.RestClientBuilder;
5656
import org.opensearch.cluster.service.ClusterService;
57+
import org.opensearch.common.logging.Loggers;
5758
import org.opensearch.common.lucene.uid.Versions;
5859
import org.opensearch.core.action.ActionListener;
5960
import org.opensearch.core.common.Strings;
@@ -141,7 +142,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
141142
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
142143
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
143144
task,
144-
logger,
145+
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
146+
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
145147
assigningClient,
146148
threadPool,
147149
scriptService,

modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java

+28
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.common.settings.Setting;
4343
import org.opensearch.common.settings.Setting.Property;
4444
import org.opensearch.common.settings.Settings;
45+
import org.opensearch.common.unit.TimeValue;
4546
import org.opensearch.core.action.ActionListener;
4647
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
4748
import org.opensearch.script.ScriptService;
@@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
7172
Function.identity(),
7273
Property.NodeScope
7374
);
75+
76+
public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
77+
"reindex.remote.retry.initial_backoff",
78+
TimeValue.timeValueMillis(500),
79+
TimeValue.timeValueMillis(50),
80+
TimeValue.timeValueMillis(5000),
81+
Property.Dynamic,
82+
Property.NodeScope
83+
);
84+
85+
public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
86+
"reindex.remote.retry.max_count",
87+
15,
88+
1,
89+
100,
90+
Property.Dynamic,
91+
Property.NodeScope
92+
);
93+
7494
public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();
7595

7696
private final ReindexValidator reindexValidator;
7797
private final Reindexer reindexer;
7898

99+
private final ClusterService clusterService;
100+
79101
@Inject
80102
public TransportReindexAction(
81103
Settings settings,
@@ -92,10 +114,16 @@ public TransportReindexAction(
92114
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
93115
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
94116
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
117+
this.clusterService = clusterService;
95118
}
96119

97120
@Override
98121
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
122+
if (request.getRemoteInfo() != null) {
123+
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
124+
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
125+
}
126+
99127
reindexValidator.initialValidation(request);
100128
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
101129
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java

+40-7
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,14 @@
6161
import org.opensearch.core.xcontent.XContentParseException;
6262
import org.opensearch.core.xcontent.XContentParser;
6363
import org.opensearch.index.reindex.RejectAwareActionListener;
64+
import org.opensearch.index.reindex.RetryListener;
6465
import org.opensearch.index.reindex.ScrollableHitSource;
6566
import org.opensearch.threadpool.ThreadPool;
6667

6768
import java.io.IOException;
6869
import java.io.InputStream;
70+
import java.net.ConnectException;
71+
import java.util.Arrays;
6972
import java.util.function.BiFunction;
7073
import java.util.function.Consumer;
7174

@@ -99,21 +102,29 @@ public RemoteScrollableHitSource(
99102

100103
@Override
101104
protected void doStart(RejectAwareActionListener<Response> searchListener) {
102-
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
105+
logger.info("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
106+
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
103107
remoteVersion = version;
104-
execute(
108+
logger.trace("Starting initial search");
109+
executeWithRetries(
105110
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
106111
RESPONSE_PARSER,
107112
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
108113
);
109-
}));
114+
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll)
115+
// level.
116+
}, searchListener::onFailure, searchListener::onFailure));
110117
}
111118

112119
void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
120+
logger.trace("Checking version for remote domain");
121+
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
122+
// instead of retrying for longer duration.
113123
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
114124
}
115125

116126
private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
127+
logger.trace("On initial search response");
117128
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
118129
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
119130
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
@@ -124,12 +135,14 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,
124135

125136
@Override
126137
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
138+
logger.trace("Starting next scroll call");
127139
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
128-
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
140+
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
129141
}
130142

131143
@Override
132144
protected void clearScroll(String scrollId, Runnable onCompletion) {
145+
logger.debug("Clearing the scrollID {}", scrollId);
133146
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
134147
@Override
135148
public void onSuccess(org.opensearch.client.Response response) {
@@ -180,17 +193,31 @@ protected void cleanup(Runnable onCompletion) {
180193
});
181194
}
182195

196+
private void executeWithRetries(
197+
Request request,
198+
BiFunction<XContentParser, MediaType, Response> parser,
199+
RejectAwareActionListener<Response> childListener
200+
) {
201+
execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> {
202+
logger.debug("Retrying execute request {}", request.getEndpoint());
203+
countSearchRetry.run();
204+
execute(request, parser, r);
205+
}, childListener));
206+
}
207+
183208
private <T> void execute(
184209
Request request,
185210
BiFunction<XContentParser, MediaType, T> parser,
186211
RejectAwareActionListener<? super T> listener
187212
) {
213+
logger.trace("Executing http request to remote cluster {}", request.getEndpoint());
188214
// Preserve the thread context so headers survive after the call
189215
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
190216
try {
191217
client.performRequestAsync(request, new ResponseListener() {
192218
@Override
193219
public void onSuccess(org.opensearch.client.Response response) {
220+
logger.trace("Successfully got response from the remote");
194221
// Restore the thread context to get the precious headers
195222
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
196223
assert ctx != null; // eliminates compiler warning
@@ -205,7 +232,7 @@ public void onSuccess(org.opensearch.client.Response response) {
205232
}
206233
if (mediaType == null) {
207234
try {
208-
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
235+
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
209236
throw new OpenSearchException(
210237
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
211238
);
@@ -237,22 +264,28 @@ public void onSuccess(org.opensearch.client.Response response) {
237264
public void onFailure(Exception e) {
238265
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
239266
assert ctx != null; // eliminates compiler warning
267+
logger.debug("Received response failure {}", e.getMessage());
240268
if (e instanceof ResponseException) {
241269
ResponseException re = (ResponseException) e;
242270
int statusCode = re.getResponse().getStatusLine().getStatusCode();
243271
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
244-
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
272+
// retry all 5xx & 429s.
273+
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode
274+
|| statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
245275
listener.onRejection(e);
246276
return;
247277
}
278+
} else if (e instanceof ConnectException) {
279+
listener.onRejection(e);
280+
return;
248281
} else if (e instanceof ContentTooLongException) {
249282
e = new IllegalArgumentException(
250283
"Remote responded with a chunk that was too large. Use a smaller batch size.",
251284
e
252285
);
253286
}
254-
listener.onFailure(e);
255287
}
288+
listener.onFailure(e);
256289
}
257290
});
258291
} catch (Exception e) {

modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java

+111-2
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@
4242
import org.apache.hc.core5.http.ContentType;
4343
import org.apache.hc.core5.http.HttpEntity;
4444
import org.apache.hc.core5.http.HttpHost;
45+
import org.apache.hc.core5.http.ProtocolVersion;
4546
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
4647
import org.apache.hc.core5.http.io.entity.StringEntity;
4748
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
49+
import org.apache.hc.core5.http.message.RequestLine;
50+
import org.apache.hc.core5.http.message.StatusLine;
4851
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
4952
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
5053
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
@@ -57,6 +60,7 @@
5760
import org.opensearch.Version;
5861
import org.opensearch.action.bulk.BackoffPolicy;
5962
import org.opensearch.action.search.SearchRequest;
63+
import org.opensearch.client.ResponseException;
6064
import org.opensearch.client.RestClient;
6165
import org.opensearch.client.http.HttpUriRequestProducer;
6266
import org.opensearch.client.nio.HeapBufferedAsyncResponseConsumer;
@@ -83,17 +87,21 @@
8387
import java.io.IOException;
8488
import java.io.InputStreamReader;
8589
import java.io.UncheckedIOException;
90+
import java.net.ConnectException;
8691
import java.net.URL;
8792
import java.nio.charset.StandardCharsets;
8893
import java.util.Queue;
8994
import java.util.concurrent.ExecutorService;
9095
import java.util.concurrent.Future;
9196
import java.util.concurrent.LinkedBlockingQueue;
9297
import java.util.concurrent.atomic.AtomicBoolean;
98+
import java.util.concurrent.atomic.AtomicInteger;
9399
import java.util.concurrent.atomic.AtomicReference;
94100
import java.util.function.Consumer;
95101
import java.util.stream.Stream;
96102

103+
import org.mockito.Mockito;
104+
97105
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
98106
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
99107
import static org.hamcrest.Matchers.empty;
@@ -515,7 +523,7 @@ public void testInvalidJsonThinksRemoteIsNotES() throws IOException {
515523
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("some_text.txt").start());
516524
assertEquals(
517525
"Error parsing the response, remote is likely not an OpenSearch instance",
518-
e.getCause().getCause().getCause().getMessage()
526+
e.getCause().getCause().getCause().getCause().getMessage()
519527
);
520528
}
521529

@@ -524,7 +532,7 @@ public void testUnexpectedJsonThinksRemoteIsNotES() throws IOException {
524532
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("main/2_3_3.json").start());
525533
assertEquals(
526534
"Error parsing the response, remote is likely not an OpenSearch instance",
527-
e.getCause().getCause().getCause().getMessage()
535+
e.getCause().getCause().getCause().getCause().getMessage()
528536
);
529537
}
530538

@@ -702,4 +710,105 @@ private static ClassicHttpRequest getRequest(AsyncRequestProducer requestProduce
702710
assertThat(requestProducer, instanceOf(HttpUriRequestProducer.class));
703711
return ((HttpUriRequestProducer) requestProducer).getRequest();
704712
}
713+
714+
RemoteScrollableHitSource createRemoteSourceWithFailure(
715+
boolean shouldMockRemoteVersion,
716+
Exception failure,
717+
AtomicInteger invocationCount
718+
) {
719+
CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() {
720+
721+
@Override
722+
public void close() throws IOException {}
723+
724+
@Override
725+
public void close(CloseMode closeMode) {}
726+
727+
@Override
728+
public void start() {}
729+
730+
@Override
731+
public void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier) {}
732+
733+
@Override
734+
public void initiateShutdown() {}
735+
736+
@Override
737+
public IOReactorStatus getStatus() {
738+
return null;
739+
}
740+
741+
@Override
742+
protected <T> Future<T> doExecute(
743+
HttpHost target,
744+
AsyncRequestProducer requestProducer,
745+
AsyncResponseConsumer<T> responseConsumer,
746+
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
747+
HttpContext context,
748+
FutureCallback<T> callback
749+
) {
750+
invocationCount.getAndIncrement();
751+
callback.failed(failure);
752+
return null;
753+
}
754+
755+
@Override
756+
public void awaitShutdown(org.apache.hc.core5.util.TimeValue waitTime) throws InterruptedException {}
757+
};
758+
return sourceWithMockedClient(shouldMockRemoteVersion, httpClient);
759+
}
760+
761+
void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) {
762+
retriesAllowed = 5;
763+
AtomicInteger invocations = new AtomicInteger();
764+
invocations.set(0);
765+
RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations);
766+
767+
Throwable e = expectThrows(RuntimeException.class, source::start);
768+
int expectedInvocations = 0;
769+
if (shouldMockRemoteVersion) {
770+
expectedInvocations += 1; // first search
771+
if (expectedToRetry) expectedInvocations += retriesAllowed;
772+
} else {
773+
expectedInvocations = 1; // the first should fail and not trigger any retry.
774+
}
775+
776+
assertEquals(expectedInvocations, invocations.get());
777+
778+
// Unwrap the some artifacts from the test
779+
while (e.getMessage().equals("failed")) {
780+
e = e.getCause();
781+
}
782+
// There is an additional wrapper for ResponseException.
783+
if (failureResponse instanceof ResponseException) {
784+
e = e.getCause();
785+
}
786+
787+
assertSame(failureResponse, e);
788+
}
789+
790+
ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException {
791+
org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class);
792+
Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN));
793+
Mockito.when(mockResponse.getStatusLine()).thenReturn(new StatusLine(new BasicClassicHttpResponse(statusCode, errorMsg)));
794+
Mockito.when(mockResponse.getRequestLine()).thenReturn(new RequestLine("GET", "/", new ProtocolVersion("https", 1, 1)));
795+
return new ResponseException(mockResponse);
796+
}
797+
798+
public void testRetryOnCallFailure() throws Exception {
799+
// First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s.
800+
verifyRetries(true, withResponseCode(500, "Internal Server Error"), true);
801+
verifyRetries(true, withResponseCode(429, "Too many requests"), true);
802+
verifyRetries(true, withResponseCode(400, "Client Error"), false);
803+
804+
// First call succeeds. Search call failed with exceptions other than ResponseException
805+
verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions.
806+
verifyRetries(true, new RuntimeException("foobar"), false);
807+
808+
// First call(remote version lookup) failed and no retries expected
809+
verifyRetries(false, withResponseCode(500, "Internal Server Error"), false);
810+
verifyRetries(false, withResponseCode(429, "Too many requests"), false);
811+
verifyRetries(false, withResponseCode(400, "Client Error"), false);
812+
verifyRetries(false, new ConnectException("blah"), false);
813+
}
705814
}

0 commit comments

Comments
 (0)