Skip to content

Commit 6bc04b4

Browse files
authored
[segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#12959)
* [segment replication] decouple the rateLimiter of segrep and recovery (12939) add setting "segrep.max_bytes_per_sec" Signed-off-by: maxliu <ly_chinese@163.com> * [segment replication] decouple the rateLimiter of segrep and recovery (12939) use setting "indices.replication.max_bytes_per_sec" if enable "indices.replication.use_individual_rate_limiter" Signed-off-by: maxliu <ly_chinese@163.com> * [segment replication] decouple the rateLimiter of segrep and recovery (12939) setting "indices.replication.max_bytes_per_sec" takes effect when not negative Signed-off-by: maxliu <ly_chinese@163.com> * [segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#12939) add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative Signed-off-by: maxliu <ly_chinese@163.com> Adds change log Signed-off-by: maxliu <ly_chinese@163.com> --------- Signed-off-by: maxliu <ly_chinese@163.com>
1 parent e828c18 commit 6bc04b4

10 files changed

+107
-28
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2020
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
2121
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
2222
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
23+
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
2324

2425
### Dependencies
2526
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
292292
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
293293
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
294294
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
295+
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
295296
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
296297
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
297298
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,

server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
575575
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
576576
final RecoveryTarget recoveryTarget = recoveryRef.get();
577577
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
578-
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
578+
recoveryTarget.handleFileChunk(
579+
request,
580+
recoveryTarget,
581+
bytesSinceLastPause,
582+
recoverySettings.recoveryRateLimiter(),
583+
listener
584+
);
579585
}
580586
}
581587
}

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

+62-17
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ public class RecoverySettings {
6565
Property.NodeScope
6666
);
6767

68+
/**
69+
* Individual speed setting for segment replication, default -1B to reuse the setting of recovery.
70+
*/
71+
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
72+
"indices.replication.max_bytes_per_sec",
73+
new ByteSizeValue(-1),
74+
Property.Dynamic,
75+
Property.NodeScope
76+
);
77+
6878
/**
6979
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
7080
*/
@@ -169,11 +179,13 @@ public class RecoverySettings {
169179
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
170180
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);
171181

172-
private volatile ByteSizeValue maxBytesPerSec;
182+
private volatile ByteSizeValue recoveryMaxBytesPerSec;
183+
private volatile ByteSizeValue replicationMaxBytesPerSec;
173184
private volatile int maxConcurrentFileChunks;
174185
private volatile int maxConcurrentOperations;
175186
private volatile int maxConcurrentRemoteStoreStreams;
176-
private volatile SimpleRateLimiter rateLimiter;
187+
private volatile SimpleRateLimiter recoveryRateLimiter;
188+
private volatile SimpleRateLimiter replicationRateLimiter;
177189
private volatile TimeValue retryDelayStateSync;
178190
private volatile TimeValue retryDelayNetwork;
179191
private volatile TimeValue activityTimeout;
@@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
198210
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);
199211

200212
this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
201-
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
202-
if (maxBytesPerSec.getBytes() <= 0) {
203-
rateLimiter = null;
213+
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
214+
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
215+
recoveryRateLimiter = null;
204216
} else {
205-
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
217+
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
206218
}
219+
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
220+
updateReplicationRateLimiter();
207221

208-
logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
222+
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
209223
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);
210224

211-
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
225+
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
226+
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
212227
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
213228
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
214229
clusterSettings.addSettingsUpdateConsumer(
@@ -227,8 +242,12 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
227242

228243
}
229244

230-
public RateLimiter rateLimiter() {
231-
return rateLimiter;
245+
public RateLimiter recoveryRateLimiter() {
246+
return recoveryRateLimiter;
247+
}
248+
249+
public RateLimiter replicationRateLimiter() {
250+
return replicationRateLimiter;
232251
}
233252

234253
public TimeValue retryDelayNetwork() {
@@ -294,14 +313,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout
294313
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
295314
}
296315

297-
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
298-
this.maxBytesPerSec = maxBytesPerSec;
299-
if (maxBytesPerSec.getBytes() <= 0) {
300-
rateLimiter = null;
301-
} else if (rateLimiter != null) {
302-
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
316+
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
317+
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
318+
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
319+
recoveryRateLimiter = null;
320+
} else if (recoveryRateLimiter != null) {
321+
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
303322
} else {
304-
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
323+
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
324+
}
325+
if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter();
326+
}
327+
328+
private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
329+
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
330+
updateReplicationRateLimiter();
331+
}
332+
333+
private void updateReplicationRateLimiter() {
334+
if (replicationMaxBytesPerSec.getBytes() >= 0) {
335+
if (replicationMaxBytesPerSec.getBytes() == 0) {
336+
replicationRateLimiter = null;
337+
} else if (replicationRateLimiter != null) {
338+
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
339+
} else {
340+
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());
341+
}
342+
} else { // when replicationMaxBytesPerSec = -1B, use setting of recovery
343+
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
344+
replicationRateLimiter = null;
345+
} else if (replicationRateLimiter != null) {
346+
replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
347+
} else {
348+
replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
349+
}
305350
}
306351
}
307352

server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler(
111111
shardId,
112112
PeerRecoveryTargetService.Actions.FILE_CHUNK,
113113
requestSeqNoGenerator,
114-
onSourceThrottle
114+
onSourceThrottle,
115+
recoverySettings::recoveryRateLimiter
115116
);
116117
this.remoteStoreEnabled = remoteStoreEnabled;
117118
}

server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.util.concurrent.atomic.AtomicLong;
2727
import java.util.function.Consumer;
28+
import java.util.function.Supplier;
2829

2930
/**
3031
* This class handles sending file chunks over the transport layer to a target shard.
@@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
3637
private final AtomicLong requestSeqNoGenerator;
3738
private final RetryableTransportClient retryableTransportClient;
3839
private final ShardId shardId;
39-
private final RecoverySettings recoverySettings;
4040
private final long replicationId;
4141
private final AtomicLong bytesSinceLastPause = new AtomicLong();
4242
private final TransportRequestOptions fileChunkRequestOptions;
4343
private final Consumer<Long> onSourceThrottle;
44+
private final Supplier<RateLimiter> rateLimiterSupplier;
4445
private final String action;
4546

4647
public RemoteSegmentFileChunkWriter(
@@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter(
5051
ShardId shardId,
5152
String action,
5253
AtomicLong requestSeqNoGenerator,
53-
Consumer<Long> onSourceThrottle
54+
Consumer<Long> onSourceThrottle,
55+
Supplier<RateLimiter> rateLimiterSupplier
5456
) {
5557
this.replicationId = replicationId;
56-
this.recoverySettings = recoverySettings;
5758
this.retryableTransportClient = retryableTransportClient;
5859
this.shardId = shardId;
5960
this.requestSeqNoGenerator = requestSeqNoGenerator;
6061
this.onSourceThrottle = onSourceThrottle;
62+
this.rateLimiterSupplier = rateLimiterSupplier;
6163
this.fileChunkRequestOptions = TransportRequestOptions.builder()
6264
.withType(TransportRequestOptions.Type.RECOVERY)
6365
.withTimeout(recoverySettings.internalActionTimeout())
@@ -78,7 +80,7 @@ public void writeFileChunk(
7880
// Pause using the rate limiter, if desired, to throttle the recovery
7981
final long throttleTimeInNanos;
8082
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
81-
final RateLimiter rl = recoverySettings.rateLimiter();
83+
final RateLimiter rl = rateLimiterSupplier.get();
8284
if (rl != null) {
8385
long bytes = bytesSinceLastPause.addAndGet(content.length());
8486
if (bytes > rl.getMinPauseCheckBytes()) {

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
123123
request.getCheckpoint().getShardId(),
124124
SegmentReplicationTargetService.Actions.FILE_CHUNK,
125125
new AtomicLong(0),
126-
(throttleTime) -> {}
126+
(throttleTime) -> {},
127+
recoverySettings::replicationRateLimiter
127128
);
128129
final SegmentReplicationSourceHandler handler = ongoingSegmentReplications.prepareForReplication(
129130
request,

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
635635
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
636636
final SegmentReplicationTarget target = ref.get();
637637
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
638-
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
638+
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);
639639
}
640640
}
641641
}

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3162,7 +3162,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers(
31623162
public InputStream maybeRateLimitRestores(InputStream stream) {
31633163
return maybeRateLimit(
31643164
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
3165-
recoverySettings::rateLimiter,
3165+
recoverySettings::recoveryRateLimiter,
31663166
restoreRateLimitingTimeInNanos,
31673167
BlobStoreTransferContext.SNAPSHOT_RESTORE
31683168
);
@@ -3185,7 +3185,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream
31853185
remoteDownloadRateLimitingTimeInNanos,
31863186
BlobStoreTransferContext.REMOTE_DOWNLOAD
31873187
),
3188-
recoverySettings::rateLimiter,
3188+
recoverySettings::recoveryRateLimiter,
31893189
remoteDownloadRateLimitingTimeInNanos,
31903190
BlobStoreTransferContext.REMOTE_DOWNLOAD
31913191
);

server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.opensearch.common.settings.ClusterSettings;
3636
import org.opensearch.common.settings.Settings;
3737
import org.opensearch.common.unit.TimeValue;
38+
import org.opensearch.core.common.unit.ByteSizeUnit;
39+
import org.opensearch.core.common.unit.ByteSizeValue;
3840
import org.opensearch.test.OpenSearchTestCase;
3941

4042
import java.util.concurrent.TimeUnit;
@@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
4749
clusterSettings.applySettings(
4850
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
4951
);
50-
assertEquals(null, recoverySettings.rateLimiter());
52+
assertNull(recoverySettings.recoveryRateLimiter());
53+
clusterSettings.applySettings(
54+
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
55+
);
56+
assertNull(recoverySettings.replicationRateLimiter());
57+
}
58+
59+
public void testSetReplicationMaxBytesPerSec() {
60+
assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
61+
clusterSettings.applySettings(
62+
Settings.builder()
63+
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
64+
.build()
65+
);
66+
assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
67+
clusterSettings.applySettings(
68+
Settings.builder()
69+
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB))
70+
.build()
71+
);
72+
assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
5173
}
5274

5375
public void testRetryDelayStateSync() {

0 commit comments

Comments
 (0)