Skip to content

Commit ec0bed2

Browse files
[Remote Store] Rate limiter for low priority uploads (#14374) (#14421)
--------- Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> (cherry picked from commit e22b651) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent f8a0448 commit ec0bed2

File tree

8 files changed

+147
-5
lines changed

8 files changed

+147
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
## [Unreleased 2.x]
77
### Added
88
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
9+
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
910

1011
### Dependencies
1112
- Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042))

server/src/internalClusterTest/java/org/opensearch/action/admin/indices/create/RemoteCloneIndexIT.java

+86
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,32 @@
4242
import org.opensearch.Version;
4343
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
4444
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
45+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
46+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
4547
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
4648
import org.opensearch.action.admin.indices.shrink.ResizeType;
4749
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
4850
import org.opensearch.client.Requests;
51+
import org.opensearch.cluster.metadata.RepositoryMetadata;
4952
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
5053
import org.opensearch.common.settings.Settings;
54+
import org.opensearch.common.unit.TimeValue;
55+
import org.opensearch.core.common.unit.ByteSizeValue;
5156
import org.opensearch.core.xcontent.MediaTypeRegistry;
5257
import org.opensearch.index.query.TermsQueryBuilder;
5358
import org.opensearch.indices.recovery.RecoverySettings;
5459
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
60+
import org.opensearch.repositories.RepositoriesService;
5561
import org.opensearch.test.OpenSearchIntegTestCase;
5662
import org.opensearch.test.VersionUtils;
63+
import org.junit.Before;
5764

5865
import java.util.concurrent.ExecutionException;
5966

6067
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6168
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
6269
import static org.hamcrest.Matchers.equalTo;
70+
import static org.hamcrest.Matchers.greaterThan;
6371

6472
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
6573
public class RemoteCloneIndexIT extends RemoteStoreBaseIntegTestCase {
@@ -69,6 +77,11 @@ protected boolean forbidPrivateIndexSettings() {
6977
return false;
7078
}
7179

80+
@Before
81+
public void setup() {
82+
asyncUploadMockFsRepo = true;
83+
}
84+
7285
public void testCreateCloneIndex() {
7386
Version version = VersionUtils.randomIndexCompatibleVersion(random());
7487
int numPrimaryShards = randomIntBetween(1, 5);
@@ -140,6 +153,79 @@ public void testCreateCloneIndex() {
140153

141154
}
142155

156+
public void testCreateCloneIndexLowPriorityRateLimit() {
157+
Version version = VersionUtils.randomIndexCompatibleVersion(random());
158+
int numPrimaryShards = 1;
159+
prepareCreate("source").setSettings(
160+
Settings.builder().put(indexSettings()).put("number_of_shards", numPrimaryShards).put("index.version.created", version)
161+
).get();
162+
final int docs = randomIntBetween(0, 128);
163+
for (int i = 0; i < docs; i++) {
164+
client().prepareIndex("source").setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", MediaTypeRegistry.JSON).get();
165+
}
166+
ByteSizeValue shardSize = client().admin().indices().prepareStats("source").execute().actionGet().getShards()[0].getStats()
167+
.getStore()
168+
.size();
169+
logger.info("Shard size is {}", shardSize);
170+
internalCluster().ensureAtLeastNumDataNodes(2);
171+
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
172+
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
173+
// to the require._name below.
174+
ensureGreen();
175+
// relocate all shards to one node such that we can merge it.
176+
client().admin().indices().prepareUpdateSettings("source").setSettings(Settings.builder().put("index.blocks.write", true)).get();
177+
ensureGreen();
178+
179+
// disable rebalancing to be able to capture the right stats. balancing can move the target primary
180+
// making it hard to pin point the source shards.
181+
client().admin()
182+
.cluster()
183+
.prepareUpdateSettings()
184+
.setTransientSettings(Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none"))
185+
.get();
186+
try {
187+
// apply rate limiter
188+
setLowPriorityUploadRate(REPOSITORY_NAME, "1kb");
189+
assertAcked(
190+
client().admin()
191+
.indices()
192+
.prepareResizeIndex("source", "target")
193+
.setResizeType(ResizeType.CLONE)
194+
.setSettings(Settings.builder().put("index.number_of_replicas", 0).putNull("index.blocks.write").build())
195+
.get()
196+
);
197+
ensureGreen();
198+
long uploadPauseTime = 0L;
199+
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
200+
uploadPauseTime += repositoriesService.repository(REPOSITORY_NAME).getLowPriorityRemoteUploadThrottleTimeInNanos();
201+
}
202+
assertThat(uploadPauseTime, greaterThan(TimeValue.timeValueSeconds(randomIntBetween(5, 10)).nanos()));
203+
} catch (Exception e) {
204+
throw new RuntimeException(e);
205+
} finally {
206+
// clean up
207+
client().admin()
208+
.cluster()
209+
.prepareUpdateSettings()
210+
.setTransientSettings(
211+
Settings.builder()
212+
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), (String) null)
213+
.put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)
214+
)
215+
.get();
216+
}
217+
}
218+
219+
protected void setLowPriorityUploadRate(String repoName, String value) throws ExecutionException, InterruptedException {
220+
GetRepositoriesRequest gr = new GetRepositoriesRequest(new String[] { repoName });
221+
GetRepositoriesResponse res = client().admin().cluster().getRepositories(gr).get();
222+
RepositoryMetadata rmd = res.repositories().get(0);
223+
Settings.Builder settings = Settings.builder()
224+
.put("location", rmd.settings().get("location"))
225+
.put("max_remote_low_priority_upload_bytes_per_sec", value);
226+
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType(rmd.type()).setSettings(settings).get());
227+
}
228+
143229
public void testCreateCloneIndexFailure() throws ExecutionException, InterruptedException {
144230
asyncUploadMockFsRepo = false;
145231
Version version = VersionUtils.randomIndexCompatibleVersion(random());

server/src/internalClusterTest/java/org/opensearch/remotestore/translogmetadata/mocks/MockFsMetadataSupportedRepository.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import org.opensearch.core.xcontent.NamedXContentRegistry;
1717
import org.opensearch.env.Environment;
1818
import org.opensearch.indices.recovery.RecoverySettings;
19-
import org.opensearch.repositories.fs.FsRepository;
19+
import org.opensearch.repositories.fs.ReloadableFsRepository;
2020

21-
public class MockFsMetadataSupportedRepository extends FsRepository {
21+
public class MockFsMetadataSupportedRepository extends ReloadableFsRepository {
2222

2323
public static Setting<Boolean> TRIGGER_DATA_INTEGRITY_FAILURE = Setting.boolSetting(
2424
"mock_fs_repository.trigger_data_integrity_failure",

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class RemoteDirectory extends Directory {
6464

6565
private final UnaryOperator<OffsetRangeInputStream> uploadRateLimiter;
6666

67+
private final UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter;
68+
6769
private final UnaryOperator<InputStream> downloadRateLimiter;
6870

6971
/**
@@ -76,15 +78,17 @@ public BlobContainer getBlobContainer() {
7678
}
7779

7880
public RemoteDirectory(BlobContainer blobContainer) {
79-
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity());
81+
this(blobContainer, UnaryOperator.identity(), UnaryOperator.identity(), UnaryOperator.identity());
8082
}
8183

8284
public RemoteDirectory(
8385
BlobContainer blobContainer,
8486
UnaryOperator<OffsetRangeInputStream> uploadRateLimiter,
87+
UnaryOperator<OffsetRangeInputStream> lowPriorityUploadRateLimiter,
8588
UnaryOperator<InputStream> downloadRateLimiter
8689
) {
8790
this.blobContainer = blobContainer;
91+
this.lowPriorityUploadRateLimiter = lowPriorityUploadRateLimiter;
8892
this.uploadRateLimiter = uploadRateLimiter;
8993
this.downloadRateLimiter = downloadRateLimiter;
9094
}
@@ -357,13 +361,23 @@ private void uploadBlob(
357361
remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported();
358362
}
359363
lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15);
364+
RemoteTransferContainer.OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier;
365+
if (lowPriorityUpload) {
366+
offsetRangeInputStreamSupplier = (size, position) -> lowPriorityUploadRateLimiter.apply(
367+
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)
368+
);
369+
} else {
370+
offsetRangeInputStreamSupplier = (size, position) -> uploadRateLimiter.apply(
371+
new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)
372+
);
373+
}
360374
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
361375
src,
362376
remoteFileName,
363377
contentLength,
364378
true,
365379
lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL,
366-
(size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)),
380+
offsetRangeInputStreamSupplier,
367381
expectedChecksum,
368382
remoteIntegrityEnabled
369383
);

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
7777
RemoteDirectory dataDirectory = new RemoteDirectory(
7878
blobStoreRepository.blobStore().blobContainer(dataPath),
7979
blobStoreRepository::maybeRateLimitRemoteUploadTransfers,
80+
blobStoreRepository::maybeRateLimitLowPriorityRemoteUploadTransfers,
8081
blobStoreRepository::maybeRateLimitRemoteDownloadTransfers
8182
);
8283

server/src/main/java/org/opensearch/repositories/FilterRepository.java

+5
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ public long getRemoteUploadThrottleTimeInNanos() {
148148
return in.getRemoteUploadThrottleTimeInNanos();
149149
}
150150

151+
@Override
152+
public long getLowPriorityRemoteUploadThrottleTimeInNanos() {
153+
return in.getRemoteUploadThrottleTimeInNanos();
154+
}
155+
151156
@Override
152157
public long getRemoteDownloadThrottleTimeInNanos() {
153158
return in.getRemoteDownloadThrottleTimeInNanos();

server/src/main/java/org/opensearch/repositories/Repository.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,17 @@ default void deleteSnapshotsAndReleaseLockFiles(
220220
long getRestoreThrottleTimeInNanos();
221221

222222
/**
223-
* Returns restore throttle time in nanoseconds
223+
* Returns upload throttle time in nanoseconds
224224
*/
225225
long getRemoteUploadThrottleTimeInNanos();
226226

227+
/**
228+
* Returns low priority upload throttle time in nanoseconds
229+
*/
230+
default long getLowPriorityRemoteUploadThrottleTimeInNanos() {
231+
return 0;
232+
}
233+
227234
/**
228235
* Returns restore throttle time in nanoseconds
229236
*/

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+28
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
317317

318318
private volatile RateLimiter remoteUploadRateLimiter;
319319

320+
private volatile RateLimiter remoteUploadLowPriorityRateLimiter;
321+
320322
private volatile RateLimiter remoteDownloadRateLimiter;
321323

322324
private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric();
@@ -327,6 +329,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
327329

328330
private final CounterMetric remoteUploadRateLimitingTimeInNanos = new CounterMetric();
329331

332+
private final CounterMetric remoteUploadLowPriorityRateLimitingTimeInNanos = new CounterMetric();
333+
330334
public static final ChecksumBlobStoreFormat<Metadata> GLOBAL_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
331335
"metadata",
332336
METADATA_NAME_FORMAT,
@@ -449,6 +453,11 @@ private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) {
449453
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
450454
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO);
451455
remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO);
456+
remoteUploadLowPriorityRateLimiter = getRateLimiter(
457+
metadata.settings(),
458+
"max_remote_low_priority_upload_bytes_per_sec",
459+
ByteSizeValue.ZERO
460+
);
452461
remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO);
453462
readOnly = READONLY_SETTING.get(metadata.settings());
454463
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
@@ -1968,6 +1977,11 @@ public long getRemoteUploadThrottleTimeInNanos() {
19681977
return remoteUploadRateLimitingTimeInNanos.count();
19691978
}
19701979

1980+
@Override
1981+
public long getLowPriorityRemoteUploadThrottleTimeInNanos() {
1982+
return remoteUploadLowPriorityRateLimitingTimeInNanos.count();
1983+
}
1984+
19711985
@Override
19721986
public long getRemoteDownloadThrottleTimeInNanos() {
19731987
return remoteDownloadRateLimitingTimeInNanos.count();
@@ -3315,6 +3329,20 @@ public OffsetRangeInputStream maybeRateLimitRemoteUploadTransfers(OffsetRangeInp
33153329
);
33163330
}
33173331

3332+
public OffsetRangeInputStream maybeRateLimitLowPriorityRemoteUploadTransfers(OffsetRangeInputStream offsetRangeInputStream) {
3333+
return maybeRateLimitRemoteTransfers(
3334+
maybeRateLimitRemoteTransfers(
3335+
offsetRangeInputStream,
3336+
() -> remoteUploadRateLimiter,
3337+
remoteUploadRateLimitingTimeInNanos,
3338+
BlobStoreTransferContext.REMOTE_UPLOAD
3339+
),
3340+
() -> remoteUploadLowPriorityRateLimiter,
3341+
remoteUploadLowPriorityRateLimitingTimeInNanos,
3342+
BlobStoreTransferContext.REMOTE_UPLOAD
3343+
);
3344+
}
3345+
33183346
public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream) {
33193347
return maybeRateLimit(
33203348
maybeRateLimit(

0 commit comments

Comments
 (0)