Skip to content

Commit 1197adb

Browse files
astute-decipherwangdongyu.danny
authored and
wangdongyu.danny
committed
Add recovery chunk size setting (opensearch-project#13997)
Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
1 parent 7aad560 commit 1197adb

File tree

10 files changed

+33
-93
lines changed

10 files changed

+33
-93
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
2020
- Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022))
2121
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
22+
- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997))
2223
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
2324
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
2425
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))

server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@
104104
import org.opensearch.indices.recovery.RecoveryState.Stage;
105105
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
106106
import org.opensearch.node.NodeClosedException;
107-
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
108107
import org.opensearch.plugins.AnalysisPlugin;
109108
import org.opensearch.plugins.Plugin;
110109
import org.opensearch.plugins.PluginsService;
@@ -156,7 +155,7 @@
156155
import static java.util.stream.Collectors.toList;
157156
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
158157
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
159-
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
158+
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
160159
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
161160
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
162161
import static org.hamcrest.Matchers.empty;
@@ -187,7 +186,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
187186
return Arrays.asList(
188187
MockTransportService.TestPlugin.class,
189188
MockFSIndexStore.TestPlugin.class,
190-
RecoverySettingsChunkSizePlugin.class,
191189
TestAnalysisPlugin.class,
192190
InternalSettingsPlugin.class,
193191
MockEngineFactoryPlugin.class
@@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) {
263261
// one chunk per sec..
264262
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES)
265263
// small chunks
266-
.put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
264+
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES))
267265
)
268266
.get()
269267
.isAcknowledged()
@@ -278,7 +276,10 @@ private void restoreRecoverySpeed() {
278276
.setTransientSettings(
279277
Settings.builder()
280278
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb")
281-
.put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE)
279+
.put(
280+
INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(),
281+
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY)
282+
)
282283
)
283284
.get()
284285
.isAcknowledged()

server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.opensearch.index.query.QueryBuilders;
4747
import org.opensearch.indices.recovery.FileChunkRequest;
4848
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
49-
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
5049
import org.opensearch.plugins.Plugin;
5150
import org.opensearch.test.OpenSearchIntegTestCase;
5251
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
@@ -61,7 +60,7 @@
6160
import java.util.concurrent.CountDownLatch;
6261
import java.util.concurrent.atomic.AtomicBoolean;
6362

64-
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
63+
import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING;
6564
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6665
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6766
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -81,7 +80,7 @@ public static Collection<Object[]> parameters() {
8180

8281
@Override
8382
protected Collection<Class<? extends Plugin>> nodePlugins() {
84-
return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class);
83+
return Arrays.asList(MockTransportService.TestPlugin.class);
8584
}
8685

8786
/**
@@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception {
9695
.cluster()
9796
.prepareUpdateSettings()
9897
.setTransientSettings(
99-
Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
98+
Settings.builder()
99+
.put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES))
100100
)
101101
.get()
102102
.isAcknowledged()

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ public void apply(Settings value, Settings current, Settings previous) {
307307
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
308308
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
309309
RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT,
310+
RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING,
310311
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
311312
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
312313
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,14 @@ public class RecoverySettings {
177177
);
178178

179179
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
180-
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
180+
public static final Setting<ByteSizeValue> INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting(
181+
"indices.recovery.chunk_size",
182+
new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES),
183+
new ByteSizeValue(1, ByteSizeUnit.BYTES),
184+
new ByteSizeValue(100, ByteSizeUnit.MB),
185+
Property.Dynamic,
186+
Property.NodeScope
187+
);
181188

182189
private volatile ByteSizeValue recoveryMaxBytesPerSec;
183190
private volatile ByteSizeValue replicationMaxBytesPerSec;
@@ -193,7 +200,7 @@ public class RecoverySettings {
193200
private volatile TimeValue internalActionRetryTimeout;
194201
private volatile TimeValue internalActionLongTimeout;
195202

196-
private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
203+
private volatile ByteSizeValue chunkSize;
197204
private volatile TimeValue internalRemoteUploadTimeout;
198205

199206
public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
@@ -221,6 +228,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
221228

222229
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
223230
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
231+
this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings);
224232

225233
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
226234
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
@@ -239,11 +247,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
239247
);
240248
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
241249
clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout);
250+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize);
242251
clusterSettings.addSettingsUpdateConsumer(
243252
INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING,
244253
this::setInternalActionRetryTimeout
245254
);
246-
247255
}
248256

249257
public RateLimiter recoveryRateLimiter() {
@@ -286,10 +294,7 @@ public ByteSizeValue getChunkSize() {
286294
return chunkSize;
287295
}
288296

289-
public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
290-
if (chunkSize.bytesAsInt() <= 0) {
291-
throw new IllegalArgumentException("chunkSize must be > 0");
292-
}
297+
public void setChunkSize(ByteSizeValue chunkSize) {
293298
this.chunkSize = chunkSize;
294299
}
295300

server/src/main/java/org/opensearch/node/Node.java

-5
Original file line numberDiff line numberDiff line change
@@ -1339,7 +1339,6 @@ protected Node(
13391339
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
13401340
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
13411341
{
1342-
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
13431342
b.bind(PeerRecoverySourceService.class)
13441343
.toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings));
13451344
b.bind(PeerRecoveryTargetService.class)
@@ -1447,10 +1446,6 @@ protected TransportService newTransportService(
14471446
return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer);
14481447
}
14491448

1450-
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
1451-
// Noop in production, overridden by tests
1452-
}
1453-
14541449
/**
14551450
* The settings that are used by this node. Contains original settings as well as additional settings provided by plugins.
14561451
*/

server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java

+8
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() {
119119
assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout());
120120
}
121121

122+
public void testChunkSize() {
123+
ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES);
124+
clusterSettings.applySettings(
125+
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build()
126+
);
127+
assertEquals(chunkSize, recoverySettings.getChunkSize());
128+
}
129+
122130
public void testInternalActionRetryTimeout() {
123131
long duration = between(1, 1000);
124132
TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS);

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica(
11481148
startingSeqNo
11491149
);
11501150
long fileChunkSizeInBytes = randomBoolean()
1151-
? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes()
1151+
? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes()
11521152
: randomIntBetween(1, 10 * 1024 * 1024);
11531153
final Settings settings = Settings.builder()
11541154
.put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4)))

test/framework/src/main/java/org/opensearch/node/MockNode.java

-8
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.opensearch.env.Environment;
5151
import org.opensearch.http.HttpServerTransport;
5252
import org.opensearch.indices.IndicesService;
53-
import org.opensearch.indices.recovery.RecoverySettings;
5453
import org.opensearch.plugins.Plugin;
5554
import org.opensearch.script.MockScriptService;
5655
import org.opensearch.script.ScriptContext;
@@ -236,13 +235,6 @@ protected TransportService newTransportService(
236235
}
237236
}
238237

239-
@Override
240-
protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) {
241-
if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) {
242-
clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize);
243-
}
244-
}
245-
246238
@Override
247239
protected ClusterInfoService newClusterInfoService(
248240
Settings settings,

test/framework/src/main/java/org/opensearch/node/RecoverySettingsChunkSizePlugin.java

-63
This file was deleted.

0 commit comments

Comments
 (0)