forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDefaultStreamPoller.java
335 lines (293 loc) · 11.9 KB
/
DefaultStreamPoller.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.pollingingest;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.Nullable;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.Message;
import org.opensearch.index.engine.IngestionEngine;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Default implementation of {@link StreamPoller}
*/
public class DefaultStreamPoller implements StreamPoller {
private static final Logger logger = LogManager.getLogger(DefaultStreamPoller.class);
// TODO: make this configurable
public static final long MAX_POLL_SIZE = 1000;
public static final int POLL_TIMEOUT = 1000;
private volatile State state = State.NONE;
// goal state
private volatile boolean started;
private volatile boolean closed;
private volatile boolean paused;
private IngestionShardConsumer consumer;
private ExecutorService consumerThread;
private ExecutorService processorThread;
// start of the batch, inclusive
private IngestionShardPointer batchStartPointer;
private ResetState resetState;
private final String resetValue;
private Set<IngestionShardPointer> persistedPointers;
private BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
private MessageProcessorRunnable processorRunnable;
private final CounterMetric totalPolledCount = new CounterMetric();
// A pointer to the max persisted pointer for optimizing the check
@Nullable
private IngestionShardPointer maxPersistedPointer;
private IngestionErrorStrategy errorStrategy;
public DefaultStreamPoller(
IngestionShardPointer startPointer,
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
IngestionEngine ingestionEngine,
ResetState resetState,
String resetValue,
IngestionErrorStrategy errorStrategy
) {
this(
startPointer,
persistedPointers,
consumer,
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
resetState,
resetValue,
errorStrategy
);
}
DefaultStreamPoller(
IngestionShardPointer startPointer,
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
MessageProcessorRunnable processorRunnable,
ResetState resetState,
String resetValue,
IngestionErrorStrategy errorStrategy
) {
this.consumer = Objects.requireNonNull(consumer);
this.resetState = resetState;
this.resetValue = resetValue;
batchStartPointer = startPointer;
this.persistedPointers = persistedPointers;
if (!this.persistedPointers.isEmpty()) {
maxPersistedPointer = this.persistedPointers.stream().max(IngestionShardPointer::compareTo).get();
}
this.processorRunnable = processorRunnable;
blockingQueue = processorRunnable.getBlockingQueue();
this.consumerThread = Executors.newSingleThreadExecutor(
r -> new Thread(
r,
String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", consumer.getShardId(), System.currentTimeMillis())
)
);
// TODO: allow multiple threads for processing the messages in parallel
this.processorThread = Executors.newSingleThreadExecutor(
r -> new Thread(
r,
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
)
);
this.errorStrategy = errorStrategy;
}
@Override
public void start() {
if (closed) {
throw new RuntimeException("poller is closed!");
}
started = true;
consumerThread.submit(this::startPoll);
processorThread.submit(processorRunnable);
}
/**
* Start the poller. visibile for testing
*/
protected void startPoll() {
if (!started) {
throw new IllegalStateException("poller is not started!");
}
if (closed) {
throw new IllegalStateException("poller is closed!");
}
logger.info("Starting poller for shard {}", consumer.getShardId());
// track the last record successfully written to the blocking queue
IngestionShardPointer lastSuccessfulPointer = null;
while (true) {
try {
if (closed) {
state = State.CLOSED;
break;
}
// reset the offset
if (resetState != ResetState.NONE) {
switch (resetState) {
case EARLIEST:
batchStartPointer = consumer.earliestPointer();
logger.info("Resetting offset by seeking to earliest offset {}", batchStartPointer.asString());
break;
case LATEST:
batchStartPointer = consumer.latestPointer();
logger.info("Resetting offset by seeking to latest offset {}", batchStartPointer.asString());
break;
case REWIND_BY_OFFSET:
batchStartPointer = consumer.pointerFromOffset(resetValue);
logger.info("Resetting offset by seeking to offset {}", batchStartPointer.asString());
break;
case REWIND_BY_TIMESTAMP:
batchStartPointer = consumer.pointerFromTimestampMillis(Long.parseLong(resetValue));
logger.info(
"Resetting offset by seeking to timestamp {}, corresponding offset {}",
resetValue,
batchStartPointer.asString()
);
break;
}
resetState = ResetState.NONE;
}
if (paused) {
state = State.PAUSED;
try {
// TODO: make sleep time configurable
Thread.sleep(100);
} catch (Throwable e) {
logger.error("Error in pausing the poller of shard {}: {}", consumer.getShardId(), e);
}
continue;
}
state = State.POLLING;
List<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> results = consumer.readNext(
batchStartPointer,
MAX_POLL_SIZE,
POLL_TIMEOUT
);
if (results.isEmpty()) {
// no new records
continue;
}
state = State.PROCESSING;
// process the records
for (IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> result : results) {
// check if the message is already processed
if (isProcessed(result.getPointer())) {
logger.info("Skipping message with pointer {} as it is already processed", result.getPointer().asString());
continue;
}
totalPolledCount.inc();
blockingQueue.put(result);
lastSuccessfulPointer = result.getPointer();
logger.debug(
"Put message {} with pointer {} to the blocking queue",
String.valueOf(result.getMessage().getPayload()),
result.getPointer().asString()
);
}
// update the batch start pointer to the next batch
batchStartPointer = consumer.nextPointer();
} catch (Throwable e) {
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
// Blocking error encountered. Pause poller to stop processing remaining updates.
pause();
} else {
// Advance the batch start pointer to ignore the error and continue from next record
batchStartPointer = lastSuccessfulPointer == null
? consumer.nextPointer(batchStartPointer)
: consumer.nextPointer(lastSuccessfulPointer);
}
}
}
}
private boolean isProcessed(IngestionShardPointer pointer) {
if (maxPersistedPointer == null) {
return false;
}
if (pointer.compareTo(maxPersistedPointer) > 0) {
return false;
}
return persistedPointers.contains(pointer);
}
/**
* Visible for testing. Get the max persisted pointer
* @return the max persisted pointer
*/
protected IngestionShardPointer getMaxPersistedPointer() {
return maxPersistedPointer;
}
@Override
public void pause() {
if (closed) {
throw new RuntimeException("consumer is closed!");
}
paused = true;
}
@Override
public void resume() {
if (closed) {
throw new RuntimeException("consumer is closed!");
}
paused = false;
}
@Override
public void close() {
closed = true;
if (!started) {
logger.info("consumer thread not started");
return;
}
long startTime = System.currentTimeMillis(); // Record the start time
long timeout = 5000;
while (state != State.CLOSED) {
// Check if the timeout has been reached
if (System.currentTimeMillis() - startTime > timeout) {
logger.error("Timeout reached while waiting for shard {} to close", consumer.getShardId());
break; // Exit the loop if the timeout is reached
}
try {
Thread.sleep(100);
} catch (Throwable e) {
logger.error("Error in closing the poller of shard {}: {}", consumer.getShardId(), e);
}
}
blockingQueue.clear();
consumerThread.shutdown();
// interrupts the processor
processorThread.shutdownNow();
logger.info("closed the poller of shard {}", consumer.getShardId());
}
@Override
public boolean isPaused() {
return paused;
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public IngestionShardPointer getBatchStartPointer() {
return batchStartPointer;
}
@Override
public PollingIngestStats getStats() {
PollingIngestStats.Builder builder = new PollingIngestStats.Builder();
builder.setTotalPolledCount(totalPolledCount.count());
builder.setTotalProcessedCount(processorRunnable.getStats().count());
return builder.build();
}
public State getState() {
return state;
}
}