|
39 | 39 | import java.util.ArrayList;
|
40 | 40 | import java.util.Arrays;
|
41 | 41 | import java.util.Collections;
|
| 42 | +import java.util.HashMap; |
42 | 43 | import java.util.List;
|
43 | 44 | import java.util.Map;
|
44 | 45 | import java.util.concurrent.TimeUnit;
|
| 46 | +import java.util.concurrent.atomic.AtomicInteger; |
45 | 47 | import java.util.function.BiConsumer;
|
| 48 | +import java.util.function.Consumer; |
46 | 49 | import java.util.function.LongSupplier;
|
47 | 50 | import java.util.stream.Collectors;
|
48 | 51 |
|
@@ -150,6 +153,108 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
|
150 | 153 | innerExecute(0, ingestDocument, handler);
|
151 | 154 | }
|
152 | 155 |
|
| 156 | + @Override |
| 157 | + public void batchExecute(List<IngestDocumentWrapper> ingestDocumentWrappers, Consumer<List<IngestDocumentWrapper>> handler) { |
| 158 | + innerBatchExecute(0, ingestDocumentWrappers, handler); |
| 159 | + } |
| 160 | + |
| 161 | + /** |
| 162 | + * Internal logic to process documents with current processor. |
| 163 | + * |
| 164 | + * @param currentProcessor index of processor to process batched documents |
| 165 | + * @param ingestDocumentWrappers batched documents to be processed |
| 166 | + * @param handler callback function |
| 167 | + */ |
| 168 | + void innerBatchExecute( |
| 169 | + int currentProcessor, |
| 170 | + List<IngestDocumentWrapper> ingestDocumentWrappers, |
| 171 | + Consumer<List<IngestDocumentWrapper>> handler |
| 172 | + ) { |
| 173 | + if (currentProcessor == processorsWithMetrics.size()) { |
| 174 | + handler.accept(ingestDocumentWrappers); |
| 175 | + return; |
| 176 | + } |
| 177 | + Tuple<Processor, OperationMetrics> processorWithMetric = processorsWithMetrics.get(currentProcessor); |
| 178 | + final Processor processor = processorWithMetric.v1(); |
| 179 | + final OperationMetrics metric = processorWithMetric.v2(); |
| 180 | + final long startTimeInNanos = relativeTimeProvider.getAsLong(); |
| 181 | + int size = ingestDocumentWrappers.size(); |
| 182 | + metric.beforeN(size); |
| 183 | + // Use synchronization to ensure batches are processed by processors in sequential order |
| 184 | + AtomicInteger counter = new AtomicInteger(size); |
| 185 | + List<IngestDocumentWrapper> allResults = Collections.synchronizedList(new ArrayList<>()); |
| 186 | + Map<Integer, IngestDocumentWrapper> slotToWrapperMap = createSlotIngestDocumentWrapperMap(ingestDocumentWrappers); |
| 187 | + processor.batchExecute(ingestDocumentWrappers, results -> { |
| 188 | + if (results.isEmpty()) return; |
| 189 | + allResults.addAll(results); |
| 190 | + // counter equals to 0 means all documents are processed and called back. |
| 191 | + if (counter.addAndGet(-results.size()) == 0) { |
| 192 | + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTimeInNanos); |
| 193 | + metric.afterN(allResults.size(), ingestTimeInMillis); |
| 194 | + |
| 195 | + List<IngestDocumentWrapper> documentsDropped = new ArrayList<>(); |
| 196 | + List<IngestDocumentWrapper> documentsWithException = new ArrayList<>(); |
| 197 | + List<IngestDocumentWrapper> documentsToContinue = new ArrayList<>(); |
| 198 | + int totalFailed = 0; |
| 199 | + // iterate all results to categorize them to: to continue, to drop, with exception |
| 200 | + for (IngestDocumentWrapper resultDocumentWrapper : allResults) { |
| 201 | + IngestDocumentWrapper originalDocumentWrapper = slotToWrapperMap.get(resultDocumentWrapper.getSlot()); |
| 202 | + if (resultDocumentWrapper.getException() != null) { |
| 203 | + ++totalFailed; |
| 204 | + if (ignoreFailure) { |
| 205 | + documentsToContinue.add(originalDocumentWrapper); |
| 206 | + } else { |
| 207 | + IngestProcessorException compoundProcessorException = newCompoundProcessorException( |
| 208 | + resultDocumentWrapper.getException(), |
| 209 | + processor, |
| 210 | + originalDocumentWrapper.getIngestDocument() |
| 211 | + ); |
| 212 | + documentsWithException.add( |
| 213 | + new IngestDocumentWrapper( |
| 214 | + resultDocumentWrapper.getSlot(), |
| 215 | + originalDocumentWrapper.getIngestDocument(), |
| 216 | + compoundProcessorException |
| 217 | + ) |
| 218 | + ); |
| 219 | + } |
| 220 | + } else { |
| 221 | + if (resultDocumentWrapper.getIngestDocument() == null) { |
| 222 | + documentsDropped.add(resultDocumentWrapper); |
| 223 | + } else { |
| 224 | + documentsToContinue.add(resultDocumentWrapper); |
| 225 | + } |
| 226 | + } |
| 227 | + } |
| 228 | + if (totalFailed > 0) { |
| 229 | + metric.failedN(totalFailed); |
| 230 | + } |
| 231 | + if (!documentsDropped.isEmpty()) { |
| 232 | + handler.accept(documentsDropped); |
| 233 | + } |
| 234 | + if (!documentsToContinue.isEmpty()) { |
| 235 | + innerBatchExecute(currentProcessor + 1, documentsToContinue, handler); |
| 236 | + } |
| 237 | + if (!documentsWithException.isEmpty()) { |
| 238 | + if (onFailureProcessors.isEmpty()) { |
| 239 | + handler.accept(documentsWithException); |
| 240 | + } else { |
| 241 | + documentsWithException.forEach( |
| 242 | + doc -> executeOnFailureAsync( |
| 243 | + 0, |
| 244 | + doc.getIngestDocument(), |
| 245 | + (IngestProcessorException) doc.getException(), |
| 246 | + (result, ex) -> { |
| 247 | + handler.accept(Collections.singletonList(new IngestDocumentWrapper(doc.getSlot(), result, ex))); |
| 248 | + } |
| 249 | + ) |
| 250 | + ); |
| 251 | + } |
| 252 | + } |
| 253 | + } |
| 254 | + assert counter.get() >= 0; |
| 255 | + }); |
| 256 | + } |
| 257 | + |
153 | 258 | void innerExecute(int currentProcessor, IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
|
154 | 259 | if (currentProcessor == processorsWithMetrics.size()) {
|
155 | 260 | handler.accept(ingestDocument, null);
|
@@ -266,4 +371,12 @@ static IngestProcessorException newCompoundProcessorException(Exception e, Proce
|
266 | 371 | return exception;
|
267 | 372 | }
|
268 | 373 |
|
| 374 | + private Map<Integer, IngestDocumentWrapper> createSlotIngestDocumentWrapperMap(List<IngestDocumentWrapper> ingestDocumentWrappers) { |
| 375 | + Map<Integer, IngestDocumentWrapper> slotIngestDocumentWrapperMap = new HashMap<>(); |
| 376 | + for (IngestDocumentWrapper ingestDocumentWrapper : ingestDocumentWrappers) { |
| 377 | + slotIngestDocumentWrapperMap.put(ingestDocumentWrapper.getSlot(), ingestDocumentWrapper); |
| 378 | + } |
| 379 | + return slotIngestDocumentWrapperMap; |
| 380 | + } |
| 381 | + |
269 | 382 | }
|
0 commit comments