Skip to content

Commit 8e493f3

Browse files
authored
Add batching processor base type AbstractBatchingProcessor (#14554)
Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
1 parent 8ad199d commit 8e493f3

File tree

3 files changed

+297
-0
lines changed

3 files changed

+297
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
1010
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
1111
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
12+
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
1213
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
1314
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
1415

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
import java.util.function.Consumer;
17+
18+
import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
19+
20+
/**
21+
* Abstract base class for batch processors.
22+
*
23+
* @opensearch.internal
24+
*/
25+
public abstract class AbstractBatchingProcessor extends AbstractProcessor {
26+
27+
public static final String BATCH_SIZE_FIELD = "batch_size";
28+
private static final int DEFAULT_BATCH_SIZE = 1;
29+
protected final int batchSize;
30+
31+
protected AbstractBatchingProcessor(String tag, String description, int batchSize) {
32+
super(tag, description);
33+
this.batchSize = batchSize;
34+
}
35+
36+
/**
37+
* Internal logic to process batched documents, must be implemented by concrete batch processors.
38+
*
39+
* @param ingestDocumentWrappers {@link List} of {@link IngestDocumentWrapper} to be processed.
40+
* @param handler {@link Consumer} to be called with the results of the processing.
41+
*/
42+
protected abstract void subBatchExecute(
43+
List<IngestDocumentWrapper> ingestDocumentWrappers,
44+
Consumer<List<IngestDocumentWrapper>> handler
45+
);
46+
47+
@Override
48+
public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
49+
if (ingestDocumentWrappers.isEmpty()) {
50+
handler.accept(Collections.emptyList());
51+
return;
52+
}
53+
54+
// if batch size is larger than document size, send one batch
55+
if (this.batchSize >= ingestDocumentWrappers.size()) {
56+
subBatchExecute(ingestDocumentWrappers, handler);
57+
return;
58+
}
59+
60+
// split documents into multiple batches and send each batch to batch processors
61+
List<List<IngestDocumentWrapper>> batches = cutBatches(ingestDocumentWrappers);
62+
int size = ingestDocumentWrappers.size();
63+
AtomicInteger counter = new AtomicInteger(size);
64+
List<IngestDocumentWrapper> allResults = Collections.synchronizedList(new ArrayList<>());
65+
for (List<IngestDocumentWrapper> batch : batches) {
66+
this.subBatchExecute(batch, batchResults -> {
67+
allResults.addAll(batchResults);
68+
if (counter.addAndGet(-batchResults.size()) == 0) {
69+
handler.accept(allResults);
70+
}
71+
assert counter.get() >= 0 : "counter is negative";
72+
});
73+
}
74+
}
75+
76+
private List<List<IngestDocumentWrapper>> cutBatches(List<IngestDocumentWrapper> ingestDocumentWrappers) {
77+
List<List<IngestDocumentWrapper>> batches = new ArrayList<>();
78+
for (int i = 0; i < ingestDocumentWrappers.size(); i += this.batchSize) {
79+
batches.add(ingestDocumentWrappers.subList(i, Math.min(i + this.batchSize, ingestDocumentWrappers.size())));
80+
}
81+
return batches;
82+
}
83+
84+
/**
85+
* Factory class for creating {@link AbstractBatchingProcessor} instances.
86+
*
87+
* @opensearch.internal
88+
*/
89+
public abstract static class Factory implements Processor.Factory {
90+
final String processorType;
91+
92+
protected Factory(String processorType) {
93+
this.processorType = processorType;
94+
}
95+
96+
/**
97+
* Creates a new processor instance.
98+
*
99+
* @param processorFactories The processor factories.
100+
* @param tag The processor tag.
101+
* @param description The processor description.
102+
* @param config The processor configuration.
103+
* @return The new AbstractBatchProcessor instance.
104+
* @throws Exception If the processor could not be created.
105+
*/
106+
@Override
107+
public AbstractBatchingProcessor create(
108+
Map<String, Processor.Factory> processorFactories,
109+
String tag,
110+
String description,
111+
Map<String, Object> config
112+
) throws Exception {
113+
int batchSize = ConfigurationUtils.readIntProperty(this.processorType, tag, config, BATCH_SIZE_FIELD, DEFAULT_BATCH_SIZE);
114+
if (batchSize < 1) {
115+
throw newConfigurationException(this.processorType, tag, BATCH_SIZE_FIELD, "batch size must be a positive integer");
116+
}
117+
return newProcessor(tag, description, batchSize, config);
118+
}
119+
120+
/**
121+
* Returns a new processor instance.
122+
*
123+
* @param tag tag of the processor
124+
* @param description description of the processor
125+
* @param batchSize batch size of the processor
126+
* @param config configuration of the processor
127+
* @return a new batch processor instance
128+
*/
129+
protected abstract AbstractBatchingProcessor newProcessor(
130+
String tag,
131+
String description,
132+
int batchSize,
133+
Map<String, Object> config
134+
);
135+
}
136+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
import org.opensearch.OpenSearchParseException;
12+
import org.opensearch.test.OpenSearchTestCase;
13+
14+
import java.util.ArrayList;
15+
import java.util.Arrays;
16+
import java.util.Collections;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.function.Consumer;
21+
22+
public class AbstractBatchingProcessorTests extends OpenSearchTestCase {
23+
24+
public void testBatchExecute_emptyInput() {
25+
DummyProcessor processor = new DummyProcessor(3);
26+
Consumer<List<IngestDocumentWrapper>> handler = (results) -> assertTrue(results.isEmpty());
27+
processor.batchExecute(Collections.emptyList(), handler);
28+
assertTrue(processor.getSubBatches().isEmpty());
29+
}
30+
31+
public void testBatchExecute_singleBatchSize() {
32+
DummyProcessor processor = new DummyProcessor(3);
33+
List<IngestDocumentWrapper> wrapperList = Arrays.asList(
34+
IngestDocumentPreparer.createIngestDocumentWrapper(1),
35+
IngestDocumentPreparer.createIngestDocumentWrapper(2),
36+
IngestDocumentPreparer.createIngestDocumentWrapper(3)
37+
);
38+
List<IngestDocumentWrapper> resultList = new ArrayList<>();
39+
processor.batchExecute(wrapperList, resultList::addAll);
40+
assertEquals(wrapperList, resultList);
41+
assertEquals(1, processor.getSubBatches().size());
42+
assertEquals(wrapperList, processor.getSubBatches().get(0));
43+
}
44+
45+
public void testBatchExecute_multipleBatches() {
46+
DummyProcessor processor = new DummyProcessor(2);
47+
List<IngestDocumentWrapper> wrapperList = Arrays.asList(
48+
IngestDocumentPreparer.createIngestDocumentWrapper(1),
49+
IngestDocumentPreparer.createIngestDocumentWrapper(2),
50+
IngestDocumentPreparer.createIngestDocumentWrapper(3),
51+
IngestDocumentPreparer.createIngestDocumentWrapper(4),
52+
IngestDocumentPreparer.createIngestDocumentWrapper(5)
53+
);
54+
List<IngestDocumentWrapper> resultList = new ArrayList<>();
55+
processor.batchExecute(wrapperList, resultList::addAll);
56+
assertEquals(wrapperList, resultList);
57+
assertEquals(3, processor.getSubBatches().size());
58+
assertEquals(wrapperList.subList(0, 2), processor.getSubBatches().get(0));
59+
assertEquals(wrapperList.subList(2, 4), processor.getSubBatches().get(1));
60+
assertEquals(wrapperList.subList(4, 5), processor.getSubBatches().get(2));
61+
}
62+
63+
public void testBatchExecute_randomBatches() {
64+
int batchSize = randomIntBetween(2, 32);
65+
int docCount = randomIntBetween(2, 32);
66+
DummyProcessor processor = new DummyProcessor(batchSize);
67+
List<IngestDocumentWrapper> wrapperList = new ArrayList<>();
68+
for (int i = 0; i < docCount; ++i) {
69+
wrapperList.add(IngestDocumentPreparer.createIngestDocumentWrapper(i));
70+
}
71+
List<IngestDocumentWrapper> resultList = new ArrayList<>();
72+
processor.batchExecute(wrapperList, resultList::addAll);
73+
assertEquals(wrapperList, resultList);
74+
assertEquals(docCount / batchSize + (docCount % batchSize == 0 ? 0 : 1), processor.getSubBatches().size());
75+
}
76+
77+
public void testBatchExecute_defaultBatchSize() {
78+
DummyProcessor processor = new DummyProcessor(1);
79+
List<IngestDocumentWrapper> wrapperList = Arrays.asList(
80+
IngestDocumentPreparer.createIngestDocumentWrapper(1),
81+
IngestDocumentPreparer.createIngestDocumentWrapper(2),
82+
IngestDocumentPreparer.createIngestDocumentWrapper(3)
83+
);
84+
List<IngestDocumentWrapper> resultList = new ArrayList<>();
85+
processor.batchExecute(wrapperList, resultList::addAll);
86+
assertEquals(wrapperList, resultList);
87+
assertEquals(3, processor.getSubBatches().size());
88+
assertEquals(wrapperList.subList(0, 1), processor.getSubBatches().get(0));
89+
assertEquals(wrapperList.subList(1, 2), processor.getSubBatches().get(1));
90+
assertEquals(wrapperList.subList(2, 3), processor.getSubBatches().get(2));
91+
}
92+
93+
public void testFactory_invalidBatchSize() {
94+
Map<String, Object> config = new HashMap<>();
95+
config.put("batch_size", 0);
96+
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor");
97+
OpenSearchParseException exception = assertThrows(OpenSearchParseException.class, () -> factory.create(config));
98+
assertEquals("[batch_size] batch size must be a positive integer", exception.getMessage());
99+
}
100+
101+
public void testFactory_defaultBatchSize() throws Exception {
102+
Map<String, Object> config = new HashMap<>();
103+
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor");
104+
DummyProcessor processor = (DummyProcessor) factory.create(config);
105+
assertEquals(1, processor.batchSize);
106+
}
107+
108+
public void testFactory_callNewProcessor() throws Exception {
109+
Map<String, Object> config = new HashMap<>();
110+
config.put("batch_size", 3);
111+
DummyProcessor.DummyProcessorFactory factory = new DummyProcessor.DummyProcessorFactory("DummyProcessor");
112+
DummyProcessor processor = (DummyProcessor) factory.create(config);
113+
assertEquals(3, processor.batchSize);
114+
}
115+
116+
static class DummyProcessor extends AbstractBatchingProcessor {
117+
private List<List<IngestDocumentWrapper>> subBatches = new ArrayList<>();
118+
119+
public List<List<IngestDocumentWrapper>> getSubBatches() {
120+
return subBatches;
121+
}
122+
123+
protected DummyProcessor(int batchSize) {
124+
super("tag", "description", batchSize);
125+
}
126+
127+
@Override
128+
public void subBatchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) {
129+
subBatches.add(ingestDocumentWrappers);
130+
handler.accept(ingestDocumentWrappers);
131+
}
132+
133+
@Override
134+
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
135+
return ingestDocument;
136+
}
137+
138+
@Override
139+
public String getType() {
140+
return null;
141+
}
142+
143+
public static class DummyProcessorFactory extends Factory {
144+
145+
protected DummyProcessorFactory(String processorType) {
146+
super(processorType);
147+
}
148+
149+
public AbstractBatchingProcessor create(Map<String, Object> config) throws Exception {
150+
final Map<String, org.opensearch.ingest.Processor.Factory> processorFactories = new HashMap<>();
151+
return super.create(processorFactories, "tag", "description", config);
152+
}
153+
154+
@Override
155+
protected AbstractBatchingProcessor newProcessor(String tag, String description, int batchSize, Map<String, Object> config) {
156+
return new DummyProcessor(batchSize);
157+
}
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)