Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[segment replication] decouple the rateLimiter of segrep and recovery #12959

Merged
merged 9 commits into from
Apr 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Original file line number Diff line number Diff line change
@@ -291,6 +291,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Original file line number Diff line number Diff line change
@@ -575,7 +575,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
recoveryTarget.handleFileChunk(
request,
recoveryTarget,
bytesSinceLastPause,
recoverySettings.recoveryRateLimiter(),
listener
);
}
}
}
Original file line number Diff line number Diff line change
@@ -65,6 +65,16 @@
Property.NodeScope
);

/**
* Individual speed setting for segment replication, default -1B to reuse the setting of recovery.
*/
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.replication.max_bytes_per_sec",
new ByteSizeValue(-1),
Property.Dynamic,
Property.NodeScope
);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
@@ -169,11 +179,13 @@
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter recoveryRateLimiter;
private volatile SimpleRateLimiter replicationRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
@@ -198,17 +210,20 @@
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);

this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;

Check warning on line 215 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L215

Added line #L215 was not covered by tests
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
updateReplicationRateLimiter();

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
@@ -227,8 +242,12 @@

}

public RateLimiter rateLimiter() {
return rateLimiter;
public RateLimiter recoveryRateLimiter() {
return recoveryRateLimiter;
}

public RateLimiter replicationRateLimiter() {
return replicationRateLimiter;
}

public TimeValue retryDelayNetwork() {
@@ -294,14 +313,40 @@
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
this.maxBytesPerSec = maxBytesPerSec;
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
} else if (rateLimiter != null) {
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else if (recoveryRateLimiter != null) {
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter();
}

private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
updateReplicationRateLimiter();
}

private void updateReplicationRateLimiter() {
if (replicationMaxBytesPerSec.getBytes() >= 0) {
if (replicationMaxBytesPerSec.getBytes() == 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());

Check warning on line 340 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L340

Added line #L340 was not covered by tests
}
} else { // when replicationMaxBytesPerSec = -1B, use setting of recovery
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
}
}

Original file line number Diff line number Diff line change
@@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler(
shardId,
PeerRecoveryTargetService.Actions.FILE_CHUNK,
requestSeqNoGenerator,
onSourceThrottle
onSourceThrottle,
recoverySettings::recoveryRateLimiter
);
this.remoteStoreEnabled = remoteStoreEnabled;
}
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* This class handles sending file chunks over the transport layer to a target shard.
@@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
private final AtomicLong requestSeqNoGenerator;
private final RetryableTransportClient retryableTransportClient;
private final ShardId shardId;
private final RecoverySettings recoverySettings;
private final long replicationId;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final TransportRequestOptions fileChunkRequestOptions;
private final Consumer<Long> onSourceThrottle;
private final Supplier<RateLimiter> rateLimiterSupplier;
private final String action;

public RemoteSegmentFileChunkWriter(
@@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter(
ShardId shardId,
String action,
AtomicLong requestSeqNoGenerator,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
Supplier<RateLimiter> rateLimiterSupplier
) {
this.replicationId = replicationId;
this.recoverySettings = recoverySettings;
this.retryableTransportClient = retryableTransportClient;
this.shardId = shardId;
this.requestSeqNoGenerator = requestSeqNoGenerator;
this.onSourceThrottle = onSourceThrottle;
this.rateLimiterSupplier = rateLimiterSupplier;
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
@@ -78,7 +80,7 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
final RateLimiter rl = rateLimiterSupplier.get();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Original file line number Diff line number Diff line change
@@ -124,7 +124,8 @@
request.getCheckpoint().getShardId(),
SegmentReplicationTargetService.Actions.FILE_CHUNK,
new AtomicLong(0),
(throttleTime) -> {}
(throttleTime) -> {},

Check warning on line 127 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java#L127

Added line #L127 was not covered by tests
recoverySettings::replicationRateLimiter
);
final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter);
channel.sendResponse(
Original file line number Diff line number Diff line change
@@ -635,7 +635,7 @@
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);

Check warning on line 638 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L638

Added line #L638 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
@@ -3162,7 +3162,7 @@
public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
restoreRateLimitingTimeInNanos,
BlobStoreTransferContext.SNAPSHOT_RESTORE
);
@@ -3185,7 +3185,7 @@
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,

Check warning on line 3188 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3188

Added line #L3188 was not covered by tests
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
);
Original file line number Diff line number Diff line change
@@ -35,6 +35,8 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.TimeUnit;
@@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
assertNull(recoverySettings.recoveryRateLimiter());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertNull(recoverySettings.replicationRateLimiter());
}

public void testSetReplicationMaxBytesPerSec() {
assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.build()
);
assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB))
.build()
);
assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
}

public void testRetryDelayStateSync() {
Loading