@@ -64,21 +64,25 @@ public class DefaultStreamPoller implements StreamPoller {
64
64
@ Nullable
65
65
private IngestionShardPointer maxPersistedPointer ;
66
66
67
+ private IngestionErrorStrategy errorStrategy ;
68
+
67
69
public DefaultStreamPoller (
68
70
IngestionShardPointer startPointer ,
69
71
Set <IngestionShardPointer > persistedPointers ,
70
72
IngestionShardConsumer consumer ,
71
73
IngestionEngine ingestionEngine ,
72
74
ResetState resetState ,
73
- String resetValue
75
+ String resetValue ,
76
+ IngestionErrorStrategy errorStrategy
74
77
) {
75
78
this (
76
79
startPointer ,
77
80
persistedPointers ,
78
81
consumer ,
79
- new MessageProcessorRunnable (new ArrayBlockingQueue <>(100 ), ingestionEngine ),
82
+ new MessageProcessorRunnable (new ArrayBlockingQueue <>(100 ), ingestionEngine , errorStrategy ),
80
83
resetState ,
81
- resetValue
84
+ resetValue ,
85
+ errorStrategy
82
86
);
83
87
}
84
88
@@ -88,7 +92,8 @@ public DefaultStreamPoller(
88
92
IngestionShardConsumer consumer ,
89
93
MessageProcessorRunnable processorRunnable ,
90
94
ResetState resetState ,
91
- String resetValue
95
+ String resetValue ,
96
+ IngestionErrorStrategy errorStrategy
92
97
) {
93
98
this .consumer = Objects .requireNonNull (consumer );
94
99
this .resetState = resetState ;
@@ -114,6 +119,7 @@ public DefaultStreamPoller(
114
119
String .format (Locale .ROOT , "stream-poller-processor-%d-%d" , consumer .getShardId (), System .currentTimeMillis ())
115
120
)
116
121
);
122
+ this .errorStrategy = errorStrategy ;
117
123
}
118
124
119
125
@ Override
@@ -138,6 +144,9 @@ protected void startPoll() {
138
144
}
139
145
logger .info ("Starting poller for shard {}" , consumer .getShardId ());
140
146
147
+ // track the last record successfully written to the blocking queue
148
+ IngestionShardPointer lastSuccessfulPointer = null ;
149
+
141
150
while (true ) {
142
151
try {
143
152
if (closed ) {
@@ -205,6 +214,7 @@ protected void startPoll() {
205
214
continue ;
206
215
}
207
216
blockingQueue .put (result );
217
+ lastSuccessfulPointer = result .getPointer ();
208
218
logger .debug (
209
219
"Put message {} with pointer {} to the blocking queue" ,
210
220
String .valueOf (result .getMessage ().getPayload ()),
@@ -214,8 +224,18 @@ protected void startPoll() {
214
224
// update the batch start pointer to the next batch
215
225
batchStartPointer = consumer .nextPointer ();
216
226
} catch (Throwable e ) {
217
- // TODO better error handling
218
227
logger .error ("Error in polling the shard {}: {}" , consumer .getShardId (), e );
228
+ errorStrategy .handleError (e , IngestionErrorStrategy .ErrorStage .POLLING );
229
+
230
+ if (errorStrategy .shouldPauseIngestion (e , IngestionErrorStrategy .ErrorStage .POLLING )) {
231
+ // Blocking error encountered. Pause poller to stop processing remaining updates.
232
+ pause ();
233
+ } else {
234
+ // Advance the batch start pointer to ignore the error and continue from next record
235
+ batchStartPointer = lastSuccessfulPointer == null
236
+ ? consumer .nextPointer (batchStartPointer )
237
+ : consumer .nextPointer (lastSuccessfulPointer );
238
+ }
219
239
}
220
240
}
221
241
}
0 commit comments