@@ -189,7 +189,7 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
189
189
int numRequests = scaledRandomIntBetween (32 , 128 );
190
190
BulkRequest bulkRequest = new BulkRequest ();
191
191
if (shouldSetBatchSize ) {
192
- bulkRequest .batchSize (numRequests );
192
+ bulkRequest .batchSize (scaledRandomIntBetween ( 2 , numRequests ) );
193
193
}
194
194
for (int i = 0 ; i < numRequests ; i ++) {
195
195
IndexRequest indexRequest = new IndexRequest ("index" ).id (Integer .toString (i )).setPipeline ("_id" );
@@ -214,6 +214,9 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
214
214
);
215
215
assertThat (indexResponse , notNullValue ());
216
216
assertThat (indexResponse .getId (), equalTo (Integer .toString (i )));
217
+ // verify field of successful doc
218
+ Map <String , Object > successDoc = client ().prepareGet ("index" , indexResponse .getId ()).get ().getSourceAsMap ();
219
+ assertThat (successDoc .get ("processed" ), equalTo (true ));
217
220
assertEquals (DocWriteResponse .Result .CREATED , indexResponse .getResult ());
218
221
}
219
222
}
@@ -223,51 +226,6 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
223
226
assertTrue (deletePipelineResponse .isAcknowledged ());
224
227
}
225
228
226
- public void testBulkWithIngestFailuresBatch () throws Exception {
227
- createIndex ("index" );
228
-
229
- BytesReference source = BytesReference .bytes (
230
- jsonBuilder ().startObject ()
231
- .field ("description" , "my_pipeline" )
232
- .startArray ("processors" )
233
- .startObject ()
234
- .startObject ("test" )
235
- .endObject ()
236
- .endObject ()
237
- .endArray ()
238
- .endObject ()
239
- );
240
- PutPipelineRequest putPipelineRequest = new PutPipelineRequest ("_id" , source , MediaTypeRegistry .JSON );
241
- client ().admin ().cluster ().putPipeline (putPipelineRequest ).get ();
242
-
243
- BulkRequest bulkRequest = new BulkRequest ();
244
- bulkRequest .batchSize (2 );
245
- bulkRequest .add (
246
- new IndexRequest ("index" ).id ("_fail" ).setPipeline ("_id" ).source (Requests .INDEX_CONTENT_TYPE , "field" , "value" , "fail" , true )
247
- );
248
- bulkRequest .add (
249
- new IndexRequest ("index" ).id ("_success" ).setPipeline ("_id" ).source (Requests .INDEX_CONTENT_TYPE , "field" , "value" , "fail" , false )
250
- );
251
-
252
- BulkResponse response = client ().bulk (bulkRequest ).actionGet ();
253
- MatcherAssert .assertThat (response .getItems ().length , equalTo (bulkRequest .requests ().size ()));
254
-
255
- Map <String , BulkItemResponse > results = Arrays .stream (response .getItems ())
256
- .collect (Collectors .toMap (BulkItemResponse ::getId , r -> r ));
257
-
258
- MatcherAssert .assertThat (results .keySet (), containsInAnyOrder ("_fail" , "_success" ));
259
- assertNotNull (results .get ("_fail" ).getFailure ());
260
- assertNull (results .get ("_success" ).getFailure ());
261
-
262
- // verify field of successful doc
263
- Map <String , Object > successDoc = client ().prepareGet ("index" , "_success" ).get ().getSourceAsMap ();
264
- assertThat (successDoc .get ("processed" ), equalTo (true ));
265
-
266
- // cleanup
267
- AcknowledgedResponse deletePipelineResponse = client ().admin ().cluster ().prepareDeletePipeline ("_id" ).get ();
268
- assertTrue (deletePipelineResponse .isAcknowledged ());
269
- }
270
-
271
229
public void testBulkWithIngestFailuresAndDropBatch () throws Exception {
272
230
createIndex ("index" );
273
231
0 commit comments