Skip to content

Commit c328c18

Browse files
authored
[Remote Store] Permit backed futures to prevent timeouts during upload bursts (opensearch-project#12159)
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
1 parent 83997fd commit c328c18

26 files changed

+1555
-131
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,22 @@ protected S3Repository createRepository(
249249
ClusterService clusterService,
250250
RecoverySettings recoverySettings
251251
) {
252-
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
252+
return new S3Repository(
253+
metadata,
254+
registry,
255+
service,
256+
clusterService,
257+
recoverySettings,
258+
null,
259+
null,
260+
null,
261+
null,
262+
null,
263+
false,
264+
null,
265+
null,
266+
null
267+
) {
253268

254269
@Override
255270
public BlobStore blobStore() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.repositories.s3;
10+
11+
import java.util.HashMap;
12+
import java.util.Map;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
import java.util.concurrent.atomic.AtomicLong;
15+
16+
/**
17+
* Generic stats of repository-s3 plugin.
18+
*/
19+
public class GenericStatsMetricPublisher {
20+
21+
private final AtomicLong normalPriorityQSize = new AtomicLong();
22+
private final AtomicInteger normalPriorityPermits = new AtomicInteger();
23+
private final AtomicLong lowPriorityQSize = new AtomicLong();
24+
private final AtomicInteger lowPriorityPermits = new AtomicInteger();
25+
private final long normalPriorityQCapacity;
26+
private final int maxNormalPriorityPermits;
27+
private final long lowPriorityQCapacity;
28+
private final int maxLowPriorityPermits;
29+
30+
public GenericStatsMetricPublisher(
31+
long normalPriorityQCapacity,
32+
int maxNormalPriorityPermits,
33+
long lowPriorityQCapacity,
34+
int maxLowPriorityPermits
35+
) {
36+
this.normalPriorityQCapacity = normalPriorityQCapacity;
37+
this.maxNormalPriorityPermits = maxNormalPriorityPermits;
38+
this.lowPriorityQCapacity = lowPriorityQCapacity;
39+
this.maxLowPriorityPermits = maxLowPriorityPermits;
40+
}
41+
42+
public void updateNormalPriorityQSize(long qSize) {
43+
normalPriorityQSize.addAndGet(qSize);
44+
}
45+
46+
public void updateLowPriorityQSize(long qSize) {
47+
lowPriorityQSize.addAndGet(qSize);
48+
}
49+
50+
public void updateNormalPermits(boolean increment) {
51+
if (increment) {
52+
normalPriorityPermits.incrementAndGet();
53+
} else {
54+
normalPriorityPermits.decrementAndGet();
55+
}
56+
}
57+
58+
public void updateLowPermits(boolean increment) {
59+
if (increment) {
60+
lowPriorityPermits.incrementAndGet();
61+
} else {
62+
lowPriorityPermits.decrementAndGet();
63+
}
64+
}
65+
66+
public long getNormalPriorityQSize() {
67+
return normalPriorityQSize.get();
68+
}
69+
70+
public int getAcquiredNormalPriorityPermits() {
71+
return normalPriorityPermits.get();
72+
}
73+
74+
public long getLowPriorityQSize() {
75+
return lowPriorityQSize.get();
76+
}
77+
78+
public int getAcquiredLowPriorityPermits() {
79+
return lowPriorityPermits.get();
80+
}
81+
82+
Map<String, Long> stats() {
83+
final Map<String, Long> results = new HashMap<>();
84+
results.put("NormalPriorityQUtilization", (normalPriorityQSize.get() * 100) / normalPriorityQCapacity);
85+
results.put("LowPriorityQUtilization", (lowPriorityQSize.get() * 100) / lowPriorityQCapacity);
86+
results.put("NormalPriorityPermitsUtilization", (normalPriorityPermits.get() * 100L) / maxNormalPriorityPermits);
87+
results.put("LowPriorityPermitsUtilization", (lowPriorityPermits.get() * 100L) / maxLowPriorityPermits);
88+
return results;
89+
}
90+
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+51-11
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.opensearch.core.common.Strings;
9191
import org.opensearch.core.common.unit.ByteSizeUnit;
9292
import org.opensearch.core.common.unit.ByteSizeValue;
93+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9394
import org.opensearch.repositories.s3.async.UploadRequest;
9495
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
9596

@@ -218,7 +219,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
218219
writeContext.getMetadata()
219220
);
220221
try {
221-
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
222+
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
223+
// Therefore, redirecting it to slow client.
224+
if ((uploadRequest.getWritePriority() == WritePriority.LOW
225+
&& blobStore.getLowPrioritySizeBasedBlockingQ().isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)
226+
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
227+
&& uploadRequest.getWritePriority() != WritePriority.URGENT
228+
&& blobStore.getNormalPrioritySizeBasedBlockingQ()
229+
.isMaxCapacityBelowContentLength(uploadRequest.getContentLength()) == false)) {
222230
StreamContext streamContext = SocketAccess.doPrivileged(
223231
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
224232
);
@@ -258,23 +266,55 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
258266
} else {
259267
s3AsyncClient = amazonS3Reference.get().client();
260268
}
261-
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
262-
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
263-
completableFuture.whenComplete((response, throwable) -> {
264-
if (throwable == null) {
265-
completionListener.onResponse(response);
266-
} else {
267-
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
268-
completionListener.onFailure(ex);
269-
}
270-
});
269+
270+
if (writeContext.getWritePriority() == WritePriority.URGENT
271+
|| writeContext.getWritePriority() == WritePriority.HIGH
272+
|| blobStore.isPermitBackedTransferEnabled() == false) {
273+
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
274+
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
275+
blobStore.getLowPrioritySizeBasedBlockingQ()
276+
.produce(
277+
new SizeBasedBlockingQ.Item(
278+
writeContext.getFileSize(),
279+
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
280+
)
281+
);
282+
} else if (writeContext.getWritePriority() == WritePriority.NORMAL) {
283+
blobStore.getNormalPrioritySizeBasedBlockingQ()
284+
.produce(
285+
new SizeBasedBlockingQ.Item(
286+
writeContext.getFileSize(),
287+
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
288+
)
289+
);
290+
} else {
291+
throw new IllegalStateException("Cannot perform upload for other priority types.");
292+
}
271293
}
272294
} catch (Exception e) {
273295
logger.info("exception error from blob container for file {}", writeContext.getFileName());
274296
throw new IOException(e);
275297
}
276298
}
277299

300+
private CompletableFuture<Void> createFileCompletableFuture(
301+
S3AsyncClient s3AsyncClient,
302+
UploadRequest uploadRequest,
303+
StreamContext streamContext,
304+
ActionListener<Void> completionListener
305+
) {
306+
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
307+
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
308+
return completableFuture.whenComplete((response, throwable) -> {
309+
if (throwable == null) {
310+
completionListener.onResponse(response);
311+
} else {
312+
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
313+
completionListener.onFailure(ex);
314+
}
315+
});
316+
}
317+
278318
@ExperimentalApi
279319
@Override
280320
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

+32-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.opensearch.core.common.unit.ByteSizeValue;
4646
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
4747
import org.opensearch.repositories.s3.async.AsyncTransferManager;
48+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
4849

4950
import java.io.IOException;
5051
import java.util.Collections;
@@ -56,6 +57,7 @@
5657
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
5758
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
5859
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
60+
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
5961
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
6062
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
6163
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
@@ -77,6 +79,8 @@ class S3BlobStore implements BlobStore {
7779

7880
private volatile boolean uploadRetryEnabled;
7981

82+
private volatile boolean permitBackedTransferEnabled;
83+
8084
private volatile boolean serverSideEncryption;
8185

8286
private volatile ObjectCannedACL cannedACL;
@@ -94,6 +98,9 @@ class S3BlobStore implements BlobStore {
9498
private final AsyncExecutorContainer priorityExecutorBuilder;
9599
private final AsyncExecutorContainer normalExecutorBuilder;
96100
private final boolean multipartUploadEnabled;
101+
private final SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
102+
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
103+
private final GenericStatsMetricPublisher genericStatsMetricPublisher;
97104

98105
S3BlobStore(
99106
S3Service service,
@@ -109,7 +116,10 @@ class S3BlobStore implements BlobStore {
109116
AsyncTransferManager asyncTransferManager,
110117
AsyncExecutorContainer urgentExecutorBuilder,
111118
AsyncExecutorContainer priorityExecutorBuilder,
112-
AsyncExecutorContainer normalExecutorBuilder
119+
AsyncExecutorContainer normalExecutorBuilder,
120+
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
121+
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
122+
GenericStatsMetricPublisher genericStatsMetricPublisher
113123
) {
114124
this.service = service;
115125
this.s3AsyncService = s3AsyncService;
@@ -128,6 +138,10 @@ class S3BlobStore implements BlobStore {
128138
// Settings to initialize blobstore with.
129139
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
130140
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
141+
this.normalPrioritySizeBasedBlockingQ = normalPrioritySizeBasedBlockingQ;
142+
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
143+
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
144+
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
131145
}
132146

133147
@Override
@@ -141,6 +155,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
141155
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
142156
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
143157
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
158+
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
144159
}
145160

146161
@Override
@@ -168,6 +183,10 @@ public boolean isUploadRetryEnabled() {
168183
return uploadRetryEnabled;
169184
}
170185

186+
public boolean isPermitBackedTransferEnabled() {
187+
return permitBackedTransferEnabled;
188+
}
189+
171190
public String bucket() {
172191
return bucket;
173192
}
@@ -184,6 +203,14 @@ public int getBulkDeletesSize() {
184203
return bulkDeletesSize;
185204
}
186205

206+
public SizeBasedBlockingQ getNormalPrioritySizeBasedBlockingQ() {
207+
return normalPrioritySizeBasedBlockingQ;
208+
}
209+
210+
public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
211+
return lowPrioritySizeBasedBlockingQ;
212+
}
213+
187214
@Override
188215
public BlobContainer blobContainer(BlobPath path) {
189216
return new S3BlobContainer(path, this);
@@ -201,7 +228,9 @@ public void close() throws IOException {
201228

202229
@Override
203230
public Map<String, Long> stats() {
204-
return statsMetricPublisher.getStats().toMap();
231+
Map<String, Long> stats = statsMetricPublisher.getStats().toMap();
232+
stats.putAll(genericStatsMetricPublisher.stats());
233+
return stats;
205234
}
206235

207236
@Override
@@ -211,6 +240,7 @@ public Map<Metric, Map<String, Long>> extendedStats() {
211240
}
212241
Map<Metric, Map<String, Long>> extendedStats = new HashMap<>();
213242
statsMetricPublisher.getExtendedStats().forEach((k, v) -> extendedStats.put(k, v.toMap()));
243+
extendedStats.put(Metric.GENERIC_STATS, genericStatsMetricPublisher.stats());
214244
return extendedStats;
215245
}
216246

0 commit comments

Comments
 (0)