Skip to content

Commit 77a4daf

Browse files
authored
Optimizations in s3 async upload flow (opensearch-project#11327)
Signed-off-by: vikasvb90 <vikasvb@amazon.com>
1 parent 6fa3a0d commit 77a4daf

File tree

11 files changed

+263
-28
lines changed

11 files changed

+263
-28
lines changed

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
374374
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
375375
}
376376

377-
private synchronized void releaseCachedClients() {
377+
public synchronized void releaseCachedClients() {
378378
// the clients will shutdown when they will not be used anymore
379379
for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) {
380380
clientReference.decRef();

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java

+46-5
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import org.opensearch.repositories.s3.async.UploadRequest;
9292
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
9393

94+
import java.io.BufferedInputStream;
9495
import java.io.ByteArrayInputStream;
9596
import java.io.IOException;
9697
import java.io.InputStream;
@@ -188,10 +189,38 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
188189
writeContext.getWritePriority(),
189190
writeContext.getUploadFinalizer(),
190191
writeContext.doRemoteDataIntegrityCheck(),
191-
writeContext.getExpectedChecksum()
192+
writeContext.getExpectedChecksum(),
193+
blobStore.isUploadRetryEnabled()
192194
);
193195
try {
194-
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
196+
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
197+
StreamContext streamContext = SocketAccess.doPrivileged(
198+
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
199+
);
200+
InputStreamContainer inputStream = streamContext.provideStream(0);
201+
try {
202+
executeMultipartUpload(
203+
blobStore,
204+
uploadRequest.getKey(),
205+
inputStream.getInputStream(),
206+
uploadRequest.getContentLength()
207+
);
208+
completionListener.onResponse(null);
209+
} catch (Exception ex) {
210+
logger.error(
211+
() -> new ParameterizedMessage(
212+
"Failed to upload large file {} of size {} ",
213+
uploadRequest.getKey(),
214+
uploadRequest.getContentLength()
215+
),
216+
ex
217+
);
218+
completionListener.onFailure(ex);
219+
}
220+
return;
221+
}
222+
long partSize = blobStore.getAsyncTransferManager()
223+
.calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled());
195224
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
196225
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {
197226

@@ -537,8 +566,14 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin
537566

538567
PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
539568
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
569+
final InputStream requestInputStream;
570+
if (blobStore.isUploadRetryEnabled()) {
571+
requestInputStream = new BufferedInputStream(input, (int) (blobSize + 1));
572+
} else {
573+
requestInputStream = input;
574+
}
540575
SocketAccess.doPrivilegedVoid(
541-
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(input, blobSize))
576+
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize))
542577
);
543578
} catch (final SdkException e) {
544579
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
@@ -578,6 +613,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
578613
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
579614
}
580615

616+
final InputStream requestInputStream;
617+
if (blobStore.isUploadRetryEnabled()) {
618+
requestInputStream = new BufferedInputStream(input, (int) (partSize + 1));
619+
} else {
620+
requestInputStream = input;
621+
}
622+
581623
CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestBuilder.build();
582624
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
583625
uploadId.set(
@@ -601,10 +643,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
601643
.build();
602644

603645
bytesCount += uploadPartRequest.contentLength();
604-
605646
final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
606647
() -> clientReference.get()
607-
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(input, uploadPartRequest.contentLength()))
648+
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, uploadPartRequest.contentLength()))
608649
);
609650
parts.add(CompletedPart.builder().partNumber(uploadPartRequest.partNumber()).eTag(uploadResponse.eTag()).build());
610651
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

+19
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,10 @@
5656
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
5757
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
5858
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
59+
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
5960
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
6061
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
62+
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;
6163

6264
class S3BlobStore implements BlobStore {
6365

@@ -71,6 +73,10 @@ class S3BlobStore implements BlobStore {
7173

7274
private volatile ByteSizeValue bufferSize;
7375

76+
private volatile boolean redirectLargeUploads;
77+
78+
private volatile boolean uploadRetryEnabled;
79+
7480
private volatile boolean serverSideEncryption;
7581

7682
private volatile ObjectCannedACL cannedACL;
@@ -119,6 +125,9 @@ class S3BlobStore implements BlobStore {
119125
this.normalExecutorBuilder = normalExecutorBuilder;
120126
this.priorityExecutorBuilder = priorityExecutorBuilder;
121127
this.urgentExecutorBuilder = urgentExecutorBuilder;
128+
// Settings to initialize blobstore with.
129+
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
130+
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
122131
}
123132

124133
@Override
@@ -130,6 +139,8 @@ public void reload(RepositoryMetadata repositoryMetadata) {
130139
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
131140
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
132141
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
142+
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
143+
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
133144
}
134145

135146
@Override
@@ -149,6 +160,14 @@ int getMaxRetries() {
149160
return service.settings(repositoryMetadata).maxRetries;
150161
}
151162

163+
public boolean isRedirectLargeUploads() {
164+
return redirectLargeUploads;
165+
}
166+
167+
public boolean isUploadRetryEnabled() {
168+
return uploadRetryEnabled;
169+
}
170+
152171
public String bucket() {
153172
return bucket;
154173
}

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

+16
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ class S3Repository extends MeteredBlobStoreRepository {
147147
*/
148148
static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB);
149149

150+
/**
151+
* Whether large uploads need to be redirected to slow sync s3 client.
152+
*/
153+
static final Setting<Boolean> REDIRECT_LARGE_S3_UPLOAD = Setting.boolSetting(
154+
"redirect_large_s3_upload",
155+
true,
156+
Setting.Property.NodeScope
157+
);
158+
159+
/**
160+
* Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries.
161+
*/
162+
static final Setting<Boolean> UPLOAD_RETRY_ENABLED = Setting.boolSetting("s3_upload_retry_enabled", true, Setting.Property.NodeScope);
163+
150164
/**
151165
* Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
152166
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
@@ -391,7 +405,9 @@ public void reload(RepositoryMetadata newRepositoryMetadata) {
391405

392406
// Reload configs for S3RepositoryPlugin
393407
service.settings(metadata);
408+
service.releaseCachedClients();
394409
s3AsyncService.settings(metadata);
410+
s3AsyncService.releaseCachedClients();
395411

396412
// Reload configs for S3BlobStore
397413
BlobStore blobStore = getBlobStore();

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,9 @@ public List<Setting<?>> getSettings() {
261261
S3ClientSettings.IDENTITY_TOKEN_FILE_SETTING,
262262
S3ClientSettings.ROLE_SESSION_NAME_SETTING,
263263
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING,
264-
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING
264+
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
265+
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
266+
S3Repository.UPLOAD_RETRY_ENABLED
265267
);
266268
}
267269

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ private static IrsaCredentials buildFromEnviroment(IrsaCredentials defaults) {
438438
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
439439
}
440440

441-
private synchronized void releaseCachedClients() {
441+
public synchronized void releaseCachedClients() {
442442
// the clients will shutdown when they will not be used anymore
443443
for (final AmazonS3Reference clientReference : clientsCache.values()) {
444444
clientReference.decRef();

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java

+26-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.opensearch.common.StreamContext;
2424
import org.opensearch.common.blobstore.stream.write.WritePriority;
2525
import org.opensearch.common.io.InputStreamContainer;
26-
import org.opensearch.core.common.unit.ByteSizeUnit;
2726
import org.opensearch.repositories.s3.SocketAccess;
2827
import org.opensearch.repositories.s3.StatsMetricPublisher;
2928
import org.opensearch.repositories.s3.io.CheckedContainer;
@@ -55,8 +54,8 @@ public class AsyncPartsHandler {
5554
* @param uploadId Upload Id against which multi-part is being performed
5655
* @param completedParts Reference of completed parts
5756
* @param inputStreamContainers Checksum containers
58-
* @return list of completable futures
5957
* @param statsMetricPublisher sdk metric publisher
58+
* @return list of completable futures
6059
* @throws IOException thrown in case of an IO error
6160
*/
6261
public static List<CompletableFuture<CompletedPart>> uploadParts(
@@ -69,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
6968
String uploadId,
7069
AtomicReferenceArray<CompletedPart> completedParts,
7170
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
72-
StatsMetricPublisher statsMetricPublisher
71+
StatsMetricPublisher statsMetricPublisher,
72+
boolean uploadRetryEnabled
7373
) throws IOException {
7474
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
7575
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
@@ -95,7 +95,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
9595
futures,
9696
uploadPartRequestBuilder.build(),
9797
inputStreamContainer,
98-
uploadRequest
98+
uploadRequest,
99+
uploadRetryEnabled
99100
);
100101
}
101102

@@ -132,6 +133,18 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa
132133
}));
133134
}
134135

136+
public static InputStream maybeRetryInputStream(
137+
InputStream inputStream,
138+
WritePriority writePriority,
139+
boolean uploadRetryEnabled,
140+
long contentLength
141+
) {
142+
if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
143+
return new BufferedInputStream(inputStream, (int) (contentLength + 1));
144+
}
145+
return inputStream;
146+
}
147+
135148
private static void uploadPart(
136149
S3AsyncClient s3AsyncClient,
137150
ExecutorService executorService,
@@ -142,7 +155,8 @@ private static void uploadPart(
142155
List<CompletableFuture<CompletedPart>> futures,
143156
UploadPartRequest uploadPartRequest,
144157
InputStreamContainer inputStreamContainer,
145-
UploadRequest uploadRequest
158+
UploadRequest uploadRequest,
159+
boolean uploadRetryEnabled
146160
) {
147161
Integer partNumber = uploadPartRequest.partNumber();
148162

@@ -154,9 +168,13 @@ private static void uploadPart(
154168
} else {
155169
streamReadExecutor = executorService;
156170
}
157-
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
158-
// data can be retried instead of retrying whole file by the application.
159-
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
171+
172+
InputStream inputStream = maybeRetryInputStream(
173+
inputStreamContainer.getInputStream(),
174+
uploadRequest.getWritePriority(),
175+
uploadRetryEnabled,
176+
uploadPartRequest.contentLength()
177+
);
160178
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
161179
() -> s3AsyncClient.uploadPart(
162180
uploadPartRequest,

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
import org.opensearch.common.io.InputStreamContainer;
3535
import org.opensearch.common.util.ByteUtils;
3636
import org.opensearch.core.common.unit.ByteSizeUnit;
37+
import org.opensearch.core.common.unit.ByteSizeValue;
3738
import org.opensearch.repositories.s3.SocketAccess;
3839
import org.opensearch.repositories.s3.StatsMetricPublisher;
3940
import org.opensearch.repositories.s3.io.CheckedContainer;
4041

41-
import java.io.BufferedInputStream;
4242
import java.io.IOException;
4343
import java.io.InputStream;
4444
import java.util.Arrays;
@@ -183,7 +183,8 @@ private void doUploadInParts(
183183
uploadId,
184184
completedParts,
185185
inputStreamContainers,
186-
statsMetricPublisher
186+
statsMetricPublisher,
187+
uploadRequest.isUploadRetryEnabled()
187188
);
188189
} catch (Exception ex) {
189190
try {
@@ -302,10 +303,13 @@ private static void handleException(CompletableFuture<Void> returnFuture, Suppli
302303
/**
303304
* Calculates the optimal part size of each part request if the upload operation is carried out as multipart upload.
304305
*/
305-
public long calculateOptimalPartSize(long contentLengthOfSource) {
306+
public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority writePriority, boolean uploadRetryEnabled) {
306307
if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5)) {
307308
return contentLengthOfSource;
308309
}
310+
if (uploadRetryEnabled && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
311+
return new ByteSizeValue(5, ByteSizeUnit.MB).getBytes();
312+
}
309313
double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS;
310314
optimalPartSize = Math.ceil(optimalPartSize);
311315
return (long) Math.max(optimalPartSize, minimumPartSize);
@@ -335,9 +339,13 @@ private void uploadInOneChunk(
335339
} else {
336340
streamReadExecutor = executorService;
337341
}
338-
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
339-
// data can be retried instead of retrying whole file by the application.
340-
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
342+
343+
InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream(
344+
inputStreamContainer.getInputStream(),
345+
uploadRequest.getWritePriority(),
346+
uploadRequest.isUploadRetryEnabled(),
347+
uploadRequest.getContentLength()
348+
);
341349
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
342350
() -> s3AsyncClient.putObject(
343351
putObjectRequestBuilder.build(),

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public class UploadRequest {
2525
private final boolean doRemoteDataIntegrityCheck;
2626
private final Long expectedChecksum;
2727

28+
private boolean uploadRetryEnabled;
29+
2830
/**
2931
* Construct a new UploadRequest object
3032
*
@@ -43,7 +45,8 @@ public UploadRequest(
4345
WritePriority writePriority,
4446
CheckedConsumer<Boolean, IOException> uploadFinalizer,
4547
boolean doRemoteDataIntegrityCheck,
46-
Long expectedChecksum
48+
Long expectedChecksum,
49+
boolean uploadRetryEnabled
4750
) {
4851
this.bucket = bucket;
4952
this.key = key;
@@ -52,6 +55,7 @@ public UploadRequest(
5255
this.uploadFinalizer = uploadFinalizer;
5356
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
5457
this.expectedChecksum = expectedChecksum;
58+
this.uploadRetryEnabled = uploadRetryEnabled;
5559
}
5660

5761
public String getBucket() {
@@ -81,4 +85,8 @@ public boolean doRemoteDataIntegrityCheck() {
8185
public Long getExpectedChecksum() {
8286
return expectedChecksum;
8387
}
88+
89+
public boolean isUploadRetryEnabled() {
90+
return uploadRetryEnabled;
91+
}
8492
}

0 commit comments

Comments
 (0)