Skip to content

Commit cec5768

Browse files
committed
Producer-consumer queue to track in-queue transfer events
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
1 parent 74cf9eb commit cec5768

File tree

15 files changed

+570
-210
lines changed

15 files changed

+570
-210
lines changed

plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,21 @@ protected S3Repository createRepository(
249249
ClusterService clusterService,
250250
RecoverySettings recoverySettings
251251
) {
252-
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
252+
return new S3Repository(
253+
metadata,
254+
registry,
255+
service,
256+
clusterService,
257+
recoverySettings,
258+
null,
259+
null,
260+
null,
261+
null,
262+
null,
263+
false,
264+
null,
265+
null
266+
) {
253267

254268
@Override
255269
public BlobStore blobStore() {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+39-36
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.opensearch.core.common.Strings;
8989
import org.opensearch.core.common.unit.ByteSizeUnit;
9090
import org.opensearch.core.common.unit.ByteSizeValue;
91+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
9192
import org.opensearch.repositories.s3.async.UploadRequest;
9293
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
9394

@@ -193,32 +194,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
193194
blobStore.isUploadRetryEnabled()
194195
);
195196
try {
196-
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
197-
StreamContext streamContext = SocketAccess.doPrivileged(
198-
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
199-
);
200-
InputStreamContainer inputStream = streamContext.provideStream(0);
201-
try {
202-
executeMultipartUpload(
203-
blobStore,
204-
uploadRequest.getKey(),
205-
inputStream.getInputStream(),
206-
uploadRequest.getContentLength()
207-
);
208-
completionListener.onResponse(null);
209-
} catch (Exception ex) {
210-
logger.error(
211-
() -> new ParameterizedMessage(
212-
"Failed to upload large file {} of size {} ",
213-
uploadRequest.getKey(),
214-
uploadRequest.getContentLength()
215-
),
216-
ex
217-
);
218-
completionListener.onFailure(ex);
219-
}
220-
return;
221-
}
222197
long partSize = blobStore.getAsyncTransferManager()
223198
.calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled());
224199
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
@@ -232,23 +207,51 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
232207
} else {
233208
s3AsyncClient = amazonS3Reference.get().client();
234209
}
235-
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
236-
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
237-
completableFuture.whenComplete((response, throwable) -> {
238-
if (throwable == null) {
239-
completionListener.onResponse(response);
240-
} else {
241-
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
242-
completionListener.onFailure(ex);
243-
}
244-
});
210+
211+
if (writeContext.getWritePriority() == WritePriority.URGENT || writeContext.getWritePriority() == WritePriority.HIGH) {
212+
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
213+
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
214+
blobStore.getLowPrioritySizeBasedBlockingQ()
215+
.produce(
216+
new SizeBasedBlockingQ.Item(
217+
writeContext.getFileSize(),
218+
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
219+
)
220+
);
221+
} else {
222+
blobStore.getOtherPrioritySizeBasedBlockingQ()
223+
.produce(
224+
new SizeBasedBlockingQ.Item(
225+
writeContext.getFileSize(),
226+
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
227+
)
228+
);
229+
}
245230
}
246231
} catch (Exception e) {
247232
logger.info("exception error from blob container for file {}", writeContext.getFileName());
248233
throw new IOException(e);
249234
}
250235
}
251236

237+
private CompletableFuture<Void> createFileCompletableFuture(
238+
S3AsyncClient s3AsyncClient,
239+
UploadRequest uploadRequest,
240+
StreamContext streamContext,
241+
ActionListener<Void> completionListener
242+
) {
243+
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
244+
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
245+
return completableFuture.whenComplete((response, throwable) -> {
246+
if (throwable == null) {
247+
completionListener.onResponse(response);
248+
} else {
249+
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
250+
completionListener.onFailure(ex);
251+
}
252+
});
253+
}
254+
252255
@ExperimentalApi
253256
@Override
254257
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.opensearch.core.common.unit.ByteSizeValue;
4646
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
4747
import org.opensearch.repositories.s3.async.AsyncTransferManager;
48+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
4849

4950
import java.io.IOException;
5051
import java.util.Collections;
@@ -94,6 +95,8 @@ class S3BlobStore implements BlobStore {
9495
private final AsyncExecutorContainer priorityExecutorBuilder;
9596
private final AsyncExecutorContainer normalExecutorBuilder;
9697
private final boolean multipartUploadEnabled;
98+
private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
99+
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
97100

98101
S3BlobStore(
99102
S3Service service,
@@ -109,7 +112,9 @@ class S3BlobStore implements BlobStore {
109112
AsyncTransferManager asyncTransferManager,
110113
AsyncExecutorContainer urgentExecutorBuilder,
111114
AsyncExecutorContainer priorityExecutorBuilder,
112-
AsyncExecutorContainer normalExecutorBuilder
115+
AsyncExecutorContainer normalExecutorBuilder,
116+
SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
117+
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
113118
) {
114119
this.service = service;
115120
this.s3AsyncService = s3AsyncService;
@@ -128,6 +133,8 @@ class S3BlobStore implements BlobStore {
128133
// Settings to initialize blobstore with.
129134
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
130135
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
136+
this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ;
137+
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
131138
}
132139

133140
@Override
@@ -184,6 +191,14 @@ public int getBulkDeletesSize() {
184191
return bulkDeletesSize;
185192
}
186193

194+
public SizeBasedBlockingQ getOtherPrioritySizeBasedBlockingQ() {
195+
return otherPrioritySizeBasedBlockingQ;
196+
}
197+
198+
public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
199+
return lowPrioritySizeBasedBlockingQ;
200+
}
201+
187202
@Override
188203
public BlobContainer blobContainer(BlobPath path) {
189204
return new S3BlobContainer(path, this);

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

+28-4
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.common.settings.SecureSetting;
5050
import org.opensearch.common.settings.Setting;
5151
import org.opensearch.common.settings.Settings;
52+
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
5253
import org.opensearch.core.action.ActionListener;
5354
import org.opensearch.core.common.Strings;
5455
import org.opensearch.core.common.settings.SecureString;
@@ -63,6 +64,7 @@
6364
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
6465
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
6566
import org.opensearch.repositories.s3.async.AsyncTransferManager;
67+
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
6668
import org.opensearch.snapshots.SnapshotId;
6769
import org.opensearch.snapshots.SnapshotInfo;
6870
import org.opensearch.threadpool.Scheduler;
@@ -210,6 +212,16 @@ class S3Repository extends MeteredBlobStoreRepository {
210212
Setting.Property.NodeScope
211213
);
212214

215+
/**
216+
* Number of transfer queue consumers
217+
*/
218+
public static Setting<Integer> S3_TRANSFER_QUEUE_CONSUMERS = new Setting<>(
219+
"s3_transfer_queue_consumers",
220+
(s) -> Integer.toString(Math.max(5, OpenSearchExecutors.allocatedProcessors(s))),
221+
(s) -> Setting.parseInt(s, 5, "s3_transfer_queue_consumers"),
222+
Setting.Property.NodeScope
223+
);
224+
213225
/**
214226
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
215227
*/
@@ -268,6 +280,8 @@ class S3Repository extends MeteredBlobStoreRepository {
268280
private final AsyncExecutorContainer priorityExecutorBuilder;
269281
private final AsyncExecutorContainer normalExecutorBuilder;
270282
private final Path pluginConfigPath;
283+
private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
284+
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
271285

272286
private volatile int bulkDeletesSize;
273287

@@ -283,7 +297,9 @@ class S3Repository extends MeteredBlobStoreRepository {
283297
final AsyncExecutorContainer priorityExecutorBuilder,
284298
final AsyncExecutorContainer normalExecutorBuilder,
285299
final S3AsyncService s3AsyncService,
286-
final boolean multipartUploadEnabled
300+
final boolean multipartUploadEnabled,
301+
final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
302+
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
287303
) {
288304
this(
289305
metadata,
@@ -297,7 +313,9 @@ class S3Repository extends MeteredBlobStoreRepository {
297313
normalExecutorBuilder,
298314
s3AsyncService,
299315
multipartUploadEnabled,
300-
Path.of("")
316+
Path.of(""),
317+
otherPrioritySizeBasedBlockingQ,
318+
lowPrioritySizeBasedBlockingQ
301319
);
302320
}
303321

@@ -316,7 +334,9 @@ class S3Repository extends MeteredBlobStoreRepository {
316334
final AsyncExecutorContainer normalExecutorBuilder,
317335
final S3AsyncService s3AsyncService,
318336
final boolean multipartUploadEnabled,
319-
Path pluginConfigPath
337+
Path pluginConfigPath,
338+
final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
339+
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
320340
) {
321341
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
322342
this.service = service;
@@ -327,6 +347,8 @@ class S3Repository extends MeteredBlobStoreRepository {
327347
this.urgentExecutorBuilder = urgentExecutorBuilder;
328348
this.priorityExecutorBuilder = priorityExecutorBuilder;
329349
this.normalExecutorBuilder = normalExecutorBuilder;
350+
this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ;
351+
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
330352

331353
validateRepositoryMetadata(metadata);
332354
readRepositoryMetadata();
@@ -389,7 +411,9 @@ protected S3BlobStore createBlobStore() {
389411
asyncUploadUtils,
390412
urgentExecutorBuilder,
391413
priorityExecutorBuilder,
392-
normalExecutorBuilder
414+
normalExecutorBuilder,
415+
otherPrioritySizeBasedBlockingQ,
416+
lowPrioritySizeBasedBlockingQ
393417
);
394418
}
395419

0 commit comments

Comments
 (0)