Skip to content

Commit c295d97

Browse files
opensearch-trigger-bot[bot]github-actions[bot]mch2
authoredApr 15, 2024
[Backport 2.x] [segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#13181)
* [segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#12959) * [segment replication] decouple the rateLimiter of segrep and recovery (12939) add setting "segrep.max_bytes_per_sec" Signed-off-by: maxliu <ly_chinese@163.com> * [segment replication] decouple the rateLimiter of segrep and recovery (12939) use setting "indices.replication.max_bytes_per_sec" if enable "indices.replication.use_individual_rate_limiter" Signed-off-by: maxliu <ly_chinese@163.com> * [segment replication] decouple the rateLimiter of segrep and recovery (12939) setting "indices.replication.max_bytes_per_sec" takes effect when not negative Signed-off-by: maxliu <ly_chinese@163.com> * [segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#12939) add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative Signed-off-by: maxliu <ly_chinese@163.com> Adds change log Signed-off-by: maxliu <ly_chinese@163.com> --------- Signed-off-by: maxliu <ly_chinese@163.com> (cherry picked from commit 6bc04b4) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> * Add back public API in RecoverySettings to 2.x Signed-off-by: Marc Handalian <marc.handalian@gmail.com> --------- Signed-off-by: maxliu <ly_chinese@163.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Signed-off-by: Marc Handalian <marc.handalian@gmail.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Marc Handalian <marc.handalian@gmail.com>
1 parent b3c3255 commit c295d97

10 files changed

+116
-27
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1717
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
1818
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
1919
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
20+
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
2021

2122
### Dependencies
2223
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

‎server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
292292
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
293293
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
294294
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
295+
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
295296
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
296297
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
297298
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,

‎server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
576576
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
577577
final RecoveryTarget recoveryTarget = recoveryRef.get();
578578
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
579-
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
579+
recoveryTarget.handleFileChunk(
580+
request,
581+
recoveryTarget,
582+
bytesSinceLastPause,
583+
recoverySettings.recoveryRateLimiter(),
584+
listener
585+
);
580586
}
581587
}
582588
}

‎server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

+71-16
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ public class RecoverySettings {
6565
Property.NodeScope
6666
);
6767

68+
/**
69+
* Individual speed setting for segment replication, default -1B to reuse the setting of recovery.
70+
*/
71+
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
72+
"indices.replication.max_bytes_per_sec",
73+
new ByteSizeValue(-1),
74+
Property.Dynamic,
75+
Property.NodeScope
76+
);
77+
6878
/**
6979
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
7080
*/
@@ -169,11 +179,13 @@ public class RecoverySettings {
169179
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
170180
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
171181

172-
private volatile ByteSizeValue maxBytesPerSec;
182+
private volatile ByteSizeValue recoveryMaxBytesPerSec;
183+
private volatile ByteSizeValue replicationMaxBytesPerSec;
173184
private volatile int maxConcurrentFileChunks;
174185
private volatile int maxConcurrentOperations;
175186
private volatile int maxConcurrentRemoteStoreStreams;
176-
private volatile SimpleRateLimiter rateLimiter;
187+
private volatile SimpleRateLimiter recoveryRateLimiter;
188+
private volatile SimpleRateLimiter replicationRateLimiter;
177189
private volatile TimeValue retryDelayStateSync;
178190
private volatile TimeValue retryDelayNetwork;
179191
private volatile TimeValue activityTimeout;
@@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
198210
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);
199211

200212
this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
201-
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
202-
if (maxBytesPerSec.getBytes() <= 0) {
203-
rateLimiter = null;
213+
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
214+
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
215+
recoveryRateLimiter = null;
204216
} else {
205-
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
217+
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
206218
}
219+
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
220+
updateReplicationRateLimiter();
207221

208-
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
222+
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
209223
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
210224

211-
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
225+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
226+
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
212227
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
213228
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
214229
clusterSettings.addSettingsUpdateConsumer(
@@ -227,8 +242,22 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
227242

228243
}
229244

245+
/**
246+
* Method unused as of 2.14 but left as part of public API.
247+
* Use recoveryRateLimiter or replicationRateLimiter instead.
248+
* @return {@link RateLimiter} Recovery rate limiter
249+
*/
250+
@Deprecated(forRemoval = true, since = "2.14")
230251
public RateLimiter rateLimiter() {
231-
return rateLimiter;
252+
return recoveryRateLimiter;
253+
}
254+
255+
public RateLimiter recoveryRateLimiter() {
256+
return recoveryRateLimiter;
257+
}
258+
259+
public RateLimiter replicationRateLimiter() {
260+
return replicationRateLimiter;
232261
}
233262

234263
public TimeValue retryDelayNetwork() {
@@ -294,14 +323,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout
294323
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
295324
}
296325

297-
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
298-
this.maxBytesPerSec = maxBytesPerSec;
299-
if (maxBytesPerSec.getBytes() <= 0) {
300-
rateLimiter = null;
301-
} else if (rateLimiter != null) {
302-
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
326+
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
327+
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
328+
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
329+
recoveryRateLimiter = null;
330+
} else if (recoveryRateLimiter != null) {
331+
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
303332
} else {
304-
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
333+
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
334+
}
335+
if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter();
336+
}
337+
338+
private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
339+
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
340+
updateReplicationRateLimiter();
341+
}
342+
343+
private void updateReplicationRateLimiter() {
344+
if (replicationMaxBytesPerSec.getBytes() >= 0) {
345+
if (replicationMaxBytesPerSec.getBytes() == 0) {
346+
replicationRateLimiter = null;
347+
} else if (replicationRateLimiter != null) {
348+
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
349+
} else {
350+
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());
351+
}
352+
} else { // when replicationMaxBytesPerSec = -1B, use setting of recovery
353+
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
354+
replicationRateLimiter = null;
355+
} else if (replicationRateLimiter != null) {
356+
replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
357+
} else {
358+
replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
359+
}
305360
}
306361
}
307362

‎server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler(
111111
shardId,
112112
PeerRecoveryTargetService.Actions.FILE_CHUNK,
113113
requestSeqNoGenerator,
114-
onSourceThrottle
114+
onSourceThrottle,
115+
recoverySettings::recoveryRateLimiter
115116
);
116117
this.remoteStoreEnabled = remoteStoreEnabled;
117118
}

‎server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.util.concurrent.atomic.AtomicLong;
2727
import java.util.function.Consumer;
28+
import java.util.function.Supplier;
2829

2930
/**
3031
* This class handles sending file chunks over the transport layer to a target shard.
@@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
3637
private final AtomicLong requestSeqNoGenerator;
3738
private final RetryableTransportClient retryableTransportClient;
3839
private final ShardId shardId;
39-
private final RecoverySettings recoverySettings;
4040
private final long replicationId;
4141
private final AtomicLong bytesSinceLastPause = new AtomicLong();
4242
private final TransportRequestOptions fileChunkRequestOptions;
4343
private final Consumer<Long> onSourceThrottle;
44+
private final Supplier<RateLimiter> rateLimiterSupplier;
4445
private final String action;
4546

4647
public RemoteSegmentFileChunkWriter(
@@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter(
5051
ShardId shardId,
5152
String action,
5253
AtomicLong requestSeqNoGenerator,
53-
Consumer<Long> onSourceThrottle
54+
Consumer<Long> onSourceThrottle,
55+
Supplier<RateLimiter> rateLimiterSupplier
5456
) {
5557
this.replicationId = replicationId;
56-
this.recoverySettings = recoverySettings;
5758
this.retryableTransportClient = retryableTransportClient;
5859
this.shardId = shardId;
5960
this.requestSeqNoGenerator = requestSeqNoGenerator;
6061
this.onSourceThrottle = onSourceThrottle;
62+
this.rateLimiterSupplier = rateLimiterSupplier;
6163
this.fileChunkRequestOptions = TransportRequestOptions.builder()
6264
.withType(TransportRequestOptions.Type.RECOVERY)
6365
.withTimeout(recoverySettings.internalActionTimeout())
@@ -78,7 +80,7 @@ public void writeFileChunk(
7880
// Pause using the rate limiter, if desired, to throttle the recovery
7981
final long throttleTimeInNanos;
8082
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
81-
final RateLimiter rl = recoverySettings.rateLimiter();
83+
final RateLimiter rl = rateLimiterSupplier.get();
8284
if (rl != null) {
8385
long bytes = bytesSinceLastPause.addAndGet(content.length());
8486
if (bytes > rl.getMinPauseCheckBytes()) {

‎server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
124124
request.getCheckpoint().getShardId(),
125125
SegmentReplicationTargetService.Actions.FILE_CHUNK,
126126
new AtomicLong(0),
127-
(throttleTime) -> {}
127+
(throttleTime) -> {},
128+
recoverySettings::replicationRateLimiter
128129
);
129130
final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter);
130131
channel.sendResponse(

‎server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
635635
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
636636
final SegmentReplicationTarget target = ref.get();
637637
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
638-
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
638+
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);
639639
}
640640
}
641641
}

‎server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3291,7 +3291,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers(
32913291
public InputStream maybeRateLimitRestores(InputStream stream) {
32923292
return maybeRateLimit(
32933293
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
3294-
recoverySettings::rateLimiter,
3294+
recoverySettings::recoveryRateLimiter,
32953295
restoreRateLimitingTimeInNanos,
32963296
BlobStoreTransferContext.SNAPSHOT_RESTORE
32973297
);
@@ -3314,7 +3314,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream
33143314
remoteDownloadRateLimitingTimeInNanos,
33153315
BlobStoreTransferContext.REMOTE_DOWNLOAD
33163316
),
3317-
recoverySettings::rateLimiter,
3317+
recoverySettings::recoveryRateLimiter,
33183318
remoteDownloadRateLimitingTimeInNanos,
33193319
BlobStoreTransferContext.REMOTE_DOWNLOAD
33203320
);

‎server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.opensearch.common.settings.ClusterSettings;
3636
import org.opensearch.common.settings.Settings;
3737
import org.opensearch.common.unit.TimeValue;
38+
import org.opensearch.core.common.unit.ByteSizeUnit;
39+
import org.opensearch.core.common.unit.ByteSizeValue;
3840
import org.opensearch.test.OpenSearchTestCase;
3941

4042
import java.util.concurrent.TimeUnit;
@@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
4749
clusterSettings.applySettings(
4850
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
4951
);
50-
assertEquals(null, recoverySettings.rateLimiter());
52+
assertNull(recoverySettings.recoveryRateLimiter());
53+
clusterSettings.applySettings(
54+
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
55+
);
56+
assertNull(recoverySettings.replicationRateLimiter());
57+
}
58+
59+
public void testSetReplicationMaxBytesPerSec() {
60+
assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
61+
clusterSettings.applySettings(
62+
Settings.builder()
63+
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
64+
.build()
65+
);
66+
assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
67+
clusterSettings.applySettings(
68+
Settings.builder()
69+
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB))
70+
.build()
71+
);
72+
assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
5173
}
5274

5375
public void testRetryDelayStateSync() {

0 commit comments

Comments
 (0)