Skip to content

Commit 4e15890

Browse files
chishuiwangdongyu.danny
authored and
wangdongyu.danny
committed
Deprecate batch_size parameter on bulk API (opensearch-project#14725)
By default the full _bulk payload will be passed to ingest processors as a batch, with any sub batching logic to be implemented by each processor if necessary. Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
1 parent 5f8f965 commit 4e15890

File tree

7 files changed

+141
-101
lines changed

7 files changed

+141
-101
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6060
- Allow system index warning in OpenSearchRestTestCase.refreshAllIndices ([#14635](https://github.com/opensearch-project/OpenSearch/pull/14635))
6161

6262
### Deprecated
63+
- Deprecate batch_size parameter on bulk API ([#14725](https://github.com/opensearch-project/OpenSearch/pull/14725))
6364

6465
### Removed
6566
- Remove query categorization changes ([#14759](https://github.com/opensearch-project/OpenSearch/pull/14759))

modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml

+1-32
Original file line numberDiff line numberDiff line change
@@ -207,15 +207,14 @@ teardown:
207207
- match: { _source: {"f1": "v2", "f2": 47, "field1": "value1", "field2": "value2"}}
208208

209209
---
210-
"Test bulk API with batch enabled happy case":
210+
"Test bulk API with default batch size":
211211
- skip:
212212
version: " - 2.13.99"
213213
reason: "Added in 2.14.0"
214214

215215
- do:
216216
bulk:
217217
refresh: true
218-
batch_size: 2
219218
pipeline: "pipeline1"
220219
body:
221220
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
@@ -245,36 +244,6 @@ teardown:
245244
id: test_id3
246245
- match: { _source: { "text": "text3", "field1": "value1" } }
247246

248-
---
249-
"Test bulk API with batch_size missing":
250-
- skip:
251-
version: " - 2.13.99"
252-
reason: "Added in 2.14.0"
253-
254-
- do:
255-
bulk:
256-
refresh: true
257-
pipeline: "pipeline1"
258-
body:
259-
- '{"index": {"_index": "test_index", "_id": "test_id1"}}'
260-
- '{"text": "text1"}'
261-
- '{"index": {"_index": "test_index", "_id": "test_id2"}}'
262-
- '{"text": "text2"}'
263-
264-
- match: { errors: false }
265-
266-
- do:
267-
get:
268-
index: test_index
269-
id: test_id1
270-
- match: { _source: { "text": "text1", "field1": "value1" } }
271-
272-
- do:
273-
get:
274-
index: test_index
275-
id: test_id2
276-
- match: { _source: { "text": "text2", "field1": "value1" } }
277-
278247
---
279248
"Test bulk API with invalid batch_size":
280249
- skip:

server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java

+81
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,87 @@ public void testBulkWithUpsert() throws Exception {
315315
assertThat(upserted.get("processed"), equalTo(true));
316316
}
317317

318+
public void testSingleDocIngestFailure() throws Exception {
319+
createIndex("test");
320+
BytesReference source = BytesReference.bytes(
321+
jsonBuilder().startObject()
322+
.field("description", "my_pipeline")
323+
.startArray("processors")
324+
.startObject()
325+
.startObject("test")
326+
.endObject()
327+
.endObject()
328+
.endArray()
329+
.endObject()
330+
);
331+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
332+
client().admin().cluster().putPipeline(putPipelineRequest).get();
333+
334+
GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id");
335+
GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get();
336+
assertThat(getResponse.isFound(), is(true));
337+
assertThat(getResponse.pipelines().size(), equalTo(1));
338+
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
339+
340+
assertThrows(
341+
IllegalArgumentException.class,
342+
() -> client().prepareIndex("test")
343+
.setId("1")
344+
.setPipeline("_id")
345+
.setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
346+
.get()
347+
);
348+
349+
DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id");
350+
AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get();
351+
assertThat(response.isAcknowledged(), is(true));
352+
353+
getResponse = client().admin().cluster().prepareGetPipeline("_id").get();
354+
assertThat(getResponse.isFound(), is(false));
355+
assertThat(getResponse.pipelines().size(), equalTo(0));
356+
}
357+
358+
public void testSingleDocIngestDrop() throws Exception {
359+
createIndex("test");
360+
BytesReference source = BytesReference.bytes(
361+
jsonBuilder().startObject()
362+
.field("description", "my_pipeline")
363+
.startArray("processors")
364+
.startObject()
365+
.startObject("test")
366+
.endObject()
367+
.endObject()
368+
.endArray()
369+
.endObject()
370+
);
371+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
372+
client().admin().cluster().putPipeline(putPipelineRequest).get();
373+
374+
GetPipelineRequest getPipelineRequest = new GetPipelineRequest("_id");
375+
GetPipelineResponse getResponse = client().admin().cluster().getPipeline(getPipelineRequest).get();
376+
assertThat(getResponse.isFound(), is(true));
377+
assertThat(getResponse.pipelines().size(), equalTo(1));
378+
assertThat(getResponse.pipelines().get(0).getId(), equalTo("_id"));
379+
380+
DocWriteResponse indexResponse = client().prepareIndex("test")
381+
.setId("1")
382+
.setPipeline("_id")
383+
.setSource(Requests.INDEX_CONTENT_TYPE, "field", "value", "drop", true)
384+
.get();
385+
assertEquals(DocWriteResponse.Result.NOOP, indexResponse.getResult());
386+
387+
Map<String, Object> doc = client().prepareGet("test", "1").get().getSourceAsMap();
388+
assertNull(doc);
389+
390+
DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest("_id");
391+
AcknowledgedResponse response = client().admin().cluster().deletePipeline(deletePipelineRequest).get();
392+
assertThat(response.isAcknowledged(), is(true));
393+
394+
getResponse = client().admin().cluster().prepareGetPipeline("_id").get();
395+
assertThat(getResponse.isFound(), is(false));
396+
assertThat(getResponse.pipelines().size(), equalTo(0));
397+
}
398+
318399
public void test() throws Exception {
319400
BytesReference source = BytesReference.bytes(
320401
jsonBuilder().startObject()

server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
9696
private String globalRouting;
9797
private String globalIndex;
9898
private Boolean globalRequireAlias;
99-
private int batchSize = 1;
99+
private int batchSize = Integer.MAX_VALUE;
100100

101101
private long sizeInBytes = 0;
102102

server/src/main/java/org/opensearch/ingest/IngestService.java

+3-61
Original file line numberDiff line numberDiff line change
@@ -525,61 +525,7 @@ public void onFailure(Exception e) {
525525

526526
@Override
527527
protected void doRun() {
528-
int batchSize = originalBulkRequest.batchSize();
529-
if (shouldExecuteBulkRequestInBatch(originalBulkRequest.requests().size(), batchSize)) {
530-
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
531-
return;
532-
}
533-
534-
final Thread originalThread = Thread.currentThread();
535-
final AtomicInteger counter = new AtomicInteger(numberOfActionRequests);
536-
int i = 0;
537-
for (DocWriteRequest<?> actionRequest : actionRequests) {
538-
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
539-
if (indexRequest == null) {
540-
if (counter.decrementAndGet() == 0) {
541-
onCompletion.accept(originalThread, null);
542-
}
543-
assert counter.get() >= 0;
544-
i++;
545-
continue;
546-
}
547-
final String pipelineId = indexRequest.getPipeline();
548-
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
549-
final String finalPipelineId = indexRequest.getFinalPipeline();
550-
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
551-
boolean hasFinalPipeline = true;
552-
final List<String> pipelines;
553-
if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false
554-
&& IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
555-
pipelines = Arrays.asList(pipelineId, finalPipelineId);
556-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) {
557-
pipelines = Collections.singletonList(pipelineId);
558-
hasFinalPipeline = false;
559-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) {
560-
pipelines = Collections.singletonList(finalPipelineId);
561-
} else {
562-
if (counter.decrementAndGet() == 0) {
563-
onCompletion.accept(originalThread, null);
564-
}
565-
assert counter.get() >= 0;
566-
i++;
567-
continue;
568-
}
569-
570-
executePipelines(
571-
i,
572-
pipelines.iterator(),
573-
hasFinalPipeline,
574-
indexRequest,
575-
onDropped,
576-
onFailure,
577-
counter,
578-
onCompletion,
579-
originalThread
580-
);
581-
i++;
582-
}
528+
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
583529
}
584530
});
585531
}
@@ -635,7 +581,7 @@ private void runBulkRequestInBatch(
635581
i++;
636582
}
637583

638-
int batchSize = originalBulkRequest.batchSize();
584+
int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize());
639585
List<List<IndexRequestWrapper>> batches = prepareBatches(batchSize, indexRequestWrappers);
640586
logger.debug("batchSize: {}, batches: {}", batchSize, batches.size());
641587

@@ -654,10 +600,6 @@ private void runBulkRequestInBatch(
654600
}
655601
}
656602

657-
private boolean shouldExecuteBulkRequestInBatch(int documentSize, int batchSize) {
658-
return documentSize > 1 && batchSize > 1;
659-
}
660-
661603
/**
662604
* IndexRequests are grouped by unique (index + pipeline_ids) before batching.
663605
* Only IndexRequests in the same group could be batched. It's to ensure batched documents always
@@ -685,7 +627,7 @@ static List<List<IndexRequestWrapper>> prepareBatches(int batchSize, List<IndexR
685627
}
686628
List<List<IndexRequestWrapper>> batchedIndexRequests = new ArrayList<>();
687629
for (Map.Entry<Integer, List<IndexRequestWrapper>> indexRequestsPerKey : indexRequestsPerIndexAndPipelines.entrySet()) {
688-
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += batchSize) {
630+
for (int i = 0; i < indexRequestsPerKey.getValue().size(); i += Math.min(indexRequestsPerKey.getValue().size(), batchSize)) {
689631
batchedIndexRequests.add(
690632
new ArrayList<>(
691633
indexRequestsPerKey.getValue().subList(i, i + Math.min(batchSize, indexRequestsPerKey.getValue().size() - i))

server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.opensearch.action.support.ActiveShardCount;
3939
import org.opensearch.client.Requests;
4040
import org.opensearch.client.node.NodeClient;
41+
import org.opensearch.common.logging.DeprecationLogger;
4142
import org.opensearch.common.settings.Settings;
4243
import org.opensearch.rest.BaseRestHandler;
4344
import org.opensearch.rest.RestRequest;
@@ -66,6 +67,8 @@
6667
public class RestBulkAction extends BaseRestHandler {
6768

6869
private final boolean allowExplicitIndex;
70+
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class);
71+
static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0.";
6972

7073
public RestBulkAction(Settings settings) {
7174
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
@@ -97,7 +100,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
97100
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
98101
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
99102
bulkRequest.setRefreshPolicy(request.param("refresh"));
100-
bulkRequest.batchSize(request.paramAsInt("batch_size", 1));
103+
if (request.hasParam("batch_size")) {
104+
deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE);
105+
}
106+
bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE));
101107
bulkRequest.add(
102108
request.requiredContent(),
103109
defaultIndex,

server/src/test/java/org/opensearch/ingest/IngestServiceTests.java

+47-6
Original file line numberDiff line numberDiff line change
@@ -1134,10 +1134,14 @@ public void testBulkRequestExecutionWithFailures() throws Exception {
11341134
Exception error = new RuntimeException();
11351135
doAnswer(args -> {
11361136
@SuppressWarnings("unchecked")
1137-
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
1138-
handler.accept(null, error);
1137+
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
1138+
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
1139+
for (IngestDocumentWrapper wrapper : ingestDocumentWrappers) {
1140+
wrapper.update(wrapper.getIngestDocument(), error);
1141+
}
1142+
handler.accept(ingestDocumentWrappers);
11391143
return null;
1140-
}).when(processor).execute(any(), any());
1144+
}).when(processor).batchExecute(any(), any());
11411145
IngestService ingestService = createWithProcessors(
11421146
Collections.singletonMap("mock", (factories, tag, description, config) -> processor)
11431147
);
@@ -1192,10 +1196,11 @@ public void testBulkRequestExecution() throws Exception {
11921196
when(processor.getTag()).thenReturn("mockTag");
11931197
doAnswer(args -> {
11941198
@SuppressWarnings("unchecked")
1195-
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
1196-
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
1199+
List<IngestDocumentWrapper> ingestDocumentWrappers = (List) args.getArguments()[0];
1200+
Consumer<List<IngestDocumentWrapper>> handler = (Consumer) args.getArguments()[1];
1201+
handler.accept(ingestDocumentWrappers);
11971202
return null;
1198-
}).when(processor).execute(any(), any());
1203+
}).when(processor).batchExecute(any(), any());
11991204
Map<String, Processor.Factory> map = new HashMap<>(2);
12001205
map.put("mock", (factories, tag, description, config) -> processor);
12011206

@@ -1957,6 +1962,42 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() {
19571962
verify(mockCompoundProcessor, never()).execute(any(), any());
19581963
}
19591964

1965+
public void testExecuteBulkRequestInBatchWithDefaultBatchSize() {
1966+
CompoundProcessor mockCompoundProcessor = mockCompoundProcessor();
1967+
IngestService ingestService = createWithProcessors(
1968+
Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor)
1969+
);
1970+
createPipeline("_id", ingestService);
1971+
BulkRequest bulkRequest = new BulkRequest();
1972+
IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1973+
bulkRequest.add(indexRequest1);
1974+
IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1975+
bulkRequest.add(indexRequest2);
1976+
IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id");
1977+
bulkRequest.add(indexRequest3);
1978+
IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
1979+
bulkRequest.add(indexRequest4);
1980+
@SuppressWarnings("unchecked")
1981+
final Map<Integer, Exception> failureHandler = new HashMap<>();
1982+
final Map<Thread, Exception> completionHandler = new HashMap<>();
1983+
final List<Integer> dropHandler = new ArrayList<>();
1984+
ingestService.executeBulkRequest(
1985+
4,
1986+
bulkRequest.requests(),
1987+
failureHandler::put,
1988+
completionHandler::put,
1989+
dropHandler::add,
1990+
Names.WRITE,
1991+
bulkRequest
1992+
);
1993+
assertTrue(failureHandler.isEmpty());
1994+
assertTrue(dropHandler.isEmpty());
1995+
assertEquals(1, completionHandler.size());
1996+
assertNull(completionHandler.get(Thread.currentThread()));
1997+
verify(mockCompoundProcessor, times(1)).batchExecute(any(), any());
1998+
verify(mockCompoundProcessor, never()).execute(any(), any());
1999+
}
2000+
19602001
public void testPrepareBatches_same_index_pipeline() {
19612002
IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));
19622003
IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1"));

0 commit comments

Comments
 (0)