Skip to content

Commit e22b651

Browse files
authored
[Remote Store] Rate limiter for low priority uploads (opensearch-project#14374)
--------- Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 112704b commit e22b651

File tree

8 files changed

+147
-5
lines changed

8 files changed

+147
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
## [Unreleased 2.x]
77
### Added
88
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
9+
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
910

1011
### Dependencies
1112
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java

+86
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,32 @@
4242
import org.opensearch.Version;
4343
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
4444
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
45+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
4547
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
4648
import org.opensearch.action.admin.indices.shrink.ResizeType;
4749
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
4850
import org.opensearch.client.Requests;
51+
import org.opensearch.cluster.metadata.RepositoryMetadata;
4952
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
5053
import org.opensearch.common.settings.Settings;
54+
import org.opensearch.common.unit.TimeValue;
55+
import org.opensearch.core.common.unit.ByteSizeValue;
5156
import org.opensearch.core.xcontent.MediaTypeRegistry;
5257
import org.opensearch.index.query.TermsQueryBuilder;
5358
import org.opensearch.indices.recovery.RecoverySettings;
5459
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
60+
import org.opensearch.repositories.RepositoriesService;
5561
import org.opensearch.test.OpenSearchIntegTestCase;
5662
import org.opensearch.test.VersionUtils;
63+
import org.junit.Before;
5764

5865
import java.util.concurrent.ExecutionException;
5966

6067
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6168
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6269
import static org.hamcrest.Matchers.equalTo;
70+
import static org.hamcrest.Matchers.greaterThan;
6371

6472
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
6573
public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase {
@@ -69,6 +77,11 @@ protected boolean forbidPrivateIndexSettings() {
6977
return false;
7078
}
7179

80+
@Before
81+
public void setup() {
82+
asyncUploadMockFsRepo = true;
83+
}
84+
7285
public void testCreateCloneIndex() {
7386
Version version = VersionUtils.randomIndexCompatibleVersion(random());
7487
int numPrimaryShards = randomIntBetween(1, 5);
@@ -140,6 +153,79 @@ public void testCreateCloneIndex() {
140153

141154
}
142155

156+
public void testCreateCloneIndexLowPriorityRateLimit() {
157+
Version version = VersionUtils.randomIndexCompatibleVersion(random());
158+
int numPrimaryShards = 1;
159+
prepareCreate("source").setSettings(
160+
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
161+
).get();
162+
final int docs = randomIntBetween(0, 128);
163+
for (int i = 0; i < docs; i++) {
164+
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
165+
}
166+
ByteSizeValue shardSize = client().admin().indices().prepareStats("source").execute().actionGet().getShards()[0].getStats()
167+
.getStore()
168+
.size();
169+
logger.info("Shard size is {}", shardSize);
170+
internalCluster().ensureAtLeastNumDataNodes(2);
171+
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
172+
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
173+
// to the require._name below.
174+
ensureGreen();
175+
// relocate all shards to one node such that we can merge it.
176+
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
177+
ensureGreen();
178+
179+
// disable rebalancing to be able to capture the right stats. balancing can move the target primary
180+
// making it hard to pin point the source shards.
181+
client().admin()
182+
.cluster()
183+
.prepareUpdateSettings()
184+
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
185+
.get();
186+
try {
187+
// apply rate limiter
188+
setLowPriorityUploadRate(REPOSITORY_NAME, "1kb");
189+
assertAcked(
190+
client().admin()
191+
.indices()
192+
.prepareResizeIndex("source", "target")
193+
.setResizeType(ResizeType.CLONE)
194+
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
195+
.get()
196+
);
197+
ensureGreen();
198+
long uploadPauseTime = 0L;
199+
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
200+
uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getLowPriorityRemoteUploadThrottleTimeInNanos();
201+
}
202+
assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos()));
203+
} catch (Exception e) {
204+
throw new RuntimeException(e);
205+
} finally {
206+
// clean up
207+
client().admin()
208+
.cluster()
209+
.prepareUpdateSettings()
210+
.setTransientSettings(
211+
Settings.builder()
212+
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
213+
.put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)
214+
)
215+
.get();
216+
}
217+
}
218+
219+
protected void setLowPriorityUploadRate(String repoName, String value) throws ExecutionException, InterruptedException {
220+
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
221+
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
222+
RepositoryMetadata rmd = res.repositories().get(0);
223+
Settings.Builder settings = Settings.builder()
224+
.put("location", rmd.settings().get("location"))
225+
.put("max_remote_low_priority_upload_bytes_per_sec", value);
226+
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType(rmd.type()).setSettings(settings).get());
227+
}
228+
143229
public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
144230
asyncUploadMockFsRepo = false;
145231
Version version = VersionUtils.randomIndexCompatibleVersion(random());

server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import org.opensearch.core.xcontent.NamedXContentRegistry;
1717
import org.opensearch.env.Environment;
1818
import org.opensearch.indices.recovery.RecoverySettings;
19-
import org.opensearch.repositories.fs.FsRepository;
19+
import org.opensearch.repositories.fs.ReloadableFsRepository;
2020

21-
public class MockFsMetadataSupportedRepository extends FsRepository {
21+
public class MockFsMetadataSupportedRepository extends ReloadableFsRepository {
2222

2323
public static Setting<Boolean> TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting(
2424
"mock_fs_repository.trigger_data_integrity_failure",

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class RemoteDirectory extends Directory {
6464

6565
private final UnaryOperator<OffsetRangeInputStream> uploadRateLimiter;
6666

67+
private final UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter;
68+
6769
private final UnaryOperator<InputStream> downloadRateLimiter;
6870

6971
/**
@@ -76,15 +78,17 @@ public BlobContainer getBlobContainer() {
7678
}
7779

7880
public RemoteDirectory(BlobContainer blobContainer) {
79-
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity());
81+
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity());
8082
}
8183

8284
public RemoteDirectory(
8385
BlobContainer blobContainer,
8486
UnaryOperator<OffsetRangeInputStream> uploadRateLimiter,
87+
UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter,
8588
UnaryOperator<InputStream> downloadRateLimiter
8689
) {
8790
this.blobContainer = blobContainer;
91+
this.lowPriorityUploadRateLimiter = lowPriorityUploadRateLimiter;
8892
this.uploadRateLimiter = uploadRateLimiter;
8993
this.downloadRateLimiter = downloadRateLimiter;
9094
}
@@ -357,13 +361,23 @@ private void uploadBlob(
357361
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported();
358362
}
359363
lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15);
364+
RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
365+
if (lowPriorityUpload) {
366+
offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply(
367+
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)
368+
);
369+
} else {
370+
offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply(
371+
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)
372+
);
373+
}
360374
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
361375
src,
362376
remoteFileName,
363377
contentLength,
364378
true,
365379
lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL,
366-
(size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)),
380+
offsetRangeInputStreamSupplier,
367381
expectedChecksum,
368382
remoteIntegrityEnabled
369383
);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
7777
RemoteDirectory dataDirectory = new RemoteDirectory(
7878
blobStoreRepository.blobStore().blobContainer(dataPath),
7979
blobStoreRepository::maybeRateLimitRemoteUploadTransfers,
80+
blobStoreRepository::maybeRateLimitLowPriorityRemoteUploadTransfers,
8081
blobStoreRepository::maybeRateLimitRemoteDownloadTransfers
8182
);
8283

server/src/main/java/org/opensearch/repositories/FilterRepository.java

+5
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ public long getRemoteUploadThrottleTimeInNanos() {
142142
return in.getRemoteUploadThrottleTimeInNanos();
143143
}
144144

145+
@Override
146+
public long getLowPriorityRemoteUploadThrottleTimeInNanos() {
147+
return in.getRemoteUploadThrottleTimeInNanos();
148+
}
149+
145150
@Override
146151
public long getRemoteDownloadThrottleTimeInNanos() {
147152
return in.getRemoteDownloadThrottleTimeInNanos();

server/src/main/java/org/opensearch/repositories/Repository.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -207,10 +207,17 @@ default void deleteSnapshotsAndReleaseLockFiles(
207207
long getRestoreThrottleTimeInNanos();
208208

209209
/**
210-
* Returns restore throttle time in nanoseconds
210+
* Returns upload throttle time in nanoseconds
211211
*/
212212
long getRemoteUploadThrottleTimeInNanos();
213213

214+
/**
215+
* Returns low priority upload throttle time in nanoseconds
216+
*/
217+
default long getLowPriorityRemoteUploadThrottleTimeInNanos() {
218+
return 0;
219+
}
220+
214221
/**
215222
* Returns restore throttle time in nanoseconds
216223
*/

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+28
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
316316

317317
private volatile RateLimiter remoteUploadRateLimiter;
318318

319+
private volatile RateLimiter remoteUploadLowPriorityRateLimiter;
320+
319321
private volatile RateLimiter remoteDownloadRateLimiter;
320322

321323
private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();
@@ -326,6 +328,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
326328

327329
private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric();
328330

331+
private final CounterMetric remoteUploadLowPriorityRateLimitingTimeInNanos = new CounterMetric();
332+
329333
public static final ChecksumBlobStoreFormat<Metadata> GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
330334
"metadata",
331335
METADATA_NAME_FORMAT,
@@ -445,6 +449,11 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
445449
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
446450
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
447451
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
452+
remoteUploadLowPriorityRateLimiter = getRateLimiter(
453+
metadata.settings(),
454+
"max_remote_low_priority_upload_bytes_per_sec",
455+
ByteSizeValue.ZERO
456+
);
448457
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
449458
readOnly = READONLY_SETTING.get(metadata.settings());
450459
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
@@ -1882,6 +1891,11 @@ public long getRemoteUploadThrottleTimeInNanos() {
18821891
return remoteUploadRateLimitingTimeInNanos.count();
18831892
}
18841893

1894+
@Override
1895+
public long getLowPriorityRemoteUploadThrottleTimeInNanos() {
1896+
return remoteUploadLowPriorityRateLimitingTimeInNanos.count();
1897+
}
1898+
18851899
@Override
18861900
public long getRemoteDownloadThrottleTimeInNanos() {
18871901
return remoteDownloadRateLimitingTimeInNanos.count();
@@ -3177,6 +3191,20 @@ public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInp
31773191
);
31783192
}
31793193

3194+
public OffsetRangeInputStream maybeRateLimitLowPriorityRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) {
3195+
return maybeRateLimitRemoteTransfers(
3196+
maybeRateLimitRemoteTransfers(
3197+
offsetRangeInputStream,
3198+
() -> remoteUploadRateLimiter,
3199+
remoteUploadRateLimitingTimeInNanos,
3200+
BlobStoreTransferContext.REMOTE_UPLOAD
3201+
),
3202+
() -> remoteUploadLowPriorityRateLimiter,
3203+
remoteUploadLowPriorityRateLimitingTimeInNanos,
3204+
BlobStoreTransferContext.REMOTE_UPLOAD
3205+
);
3206+
}
3207+
31803208
public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) {
31813209
return maybeRateLimit(
31823210
maybeRateLimit(

0 commit comments

Comments
 (0)