Skip to content

Commit 7aa62d0

Browse files
committed
TransportBulkAction.doRun()
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 5afb92f commit 7aa62d0

File tree

1 file changed

+8
-15
lines changed

1 file changed

+8
-15
lines changed

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

+8-15
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ protected void doRun() {
532532
}
533533
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
534534
Metadata metadata = clusterState.metadata();
535+
// go over all the requests and create a ShardId -> Operations mapping
536+
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
535537
for (int i = 0; i < bulkRequest.requests.size(); i++) {
536538
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
537539
// the request can only be null because we set it to null in the previous step, so it gets ignored
@@ -587,6 +589,12 @@ protected void doRun() {
587589
default:
588590
throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
589591
}
592+
593+
ShardId shardId = clusterService.operationRouting()
594+
.indexShards(clusterState, concreteIndex.getName(), docWriteRequest.id(), docWriteRequest.routing())
595+
.shardId();
596+
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
597+
shardRequests.add(new BulkItemRequest(i, docWriteRequest));
590598
} catch (OpenSearchParseException | IllegalArgumentException | RoutingMissingException e) {
591599
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.id(), e);
592600
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
@@ -596,21 +604,6 @@ protected void doRun() {
596604
}
597605
}
598606

599-
// first, go over all the requests and create a ShardId -> Operations mapping
600-
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
601-
for (int i = 0; i < bulkRequest.requests.size(); i++) {
602-
DocWriteRequest<?> request = bulkRequest.requests.get(i);
603-
if (request == null) {
604-
continue;
605-
}
606-
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
607-
ShardId shardId = clusterService.operationRouting()
608-
.indexShards(clusterState, concreteIndex, request.id(), request.routing())
609-
.shardId();
610-
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
611-
shardRequests.add(new BulkItemRequest(i, request));
612-
}
613-
614607
if (requestsByShard.isEmpty()) {
615608
BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]);
616609
long tookMillis = buildTookInMillis(startTimeNanos);

0 commit comments

Comments
 (0)