Skip to content

Commit 1b61879

Browse files
Add error handling strategy to pull-based ingestion
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 0714a1b commit 1b61879

File tree

14 files changed

+400
-32
lines changed

14 files changed

+400
-32
lines changed

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public KafkaOffset nextPointer() {
132132
return new KafkaOffset(lastFetchedOffset + 1);
133133
}
134134

135+
@Override
136+
public KafkaOffset nextPointer(KafkaOffset pointer) {
137+
return new KafkaOffset(pointer.getOffset() + 1);
138+
}
139+
135140
@Override
136141
public IngestionShardPointer earliestPointer() {
137142
long startOffset = AccessController.doPrivileged(

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.opensearch.index.IndexModule;
7272
import org.opensearch.index.mapper.MapperService;
7373
import org.opensearch.index.seqno.SequenceNumbers;
74+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
7475
import org.opensearch.indices.pollingingest.StreamPoller;
7576
import org.opensearch.indices.replication.SegmentReplicationSource;
7677
import org.opensearch.indices.replication.common.ReplicationType;
@@ -770,6 +771,30 @@ public Iterator<Setting<?>> settings() {
770771
Property.Final
771772
);
772773

774+
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error.strategy";
775+
public static final Setting<String> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = Setting.simpleString(
776+
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
777+
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
778+
new Setting.Validator<>() {
779+
780+
@Override
781+
public void validate(final String value) {
782+
try {
783+
IngestionErrorStrategy.ErrorStrategy.valueOf(value.toUpperCase(Locale.ROOT));
784+
} catch (IllegalArgumentException e) {
785+
throw new IllegalArgumentException("Invalid value for " + SETTING_INGESTION_SOURCE_ERROR_STRATEGY + " [" + value + "]");
786+
}
787+
}
788+
789+
@Override
790+
public void validate(final String value, final Map<Setting<?>, Object> settings) {
791+
validate(value);
792+
}
793+
},
794+
Property.IndexScope,
795+
Property.Dynamic
796+
);
797+
773798
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
774799
"index.ingestion_source.param.",
775800
key -> new Setting<>(key, "", (value) -> {
@@ -1001,8 +1026,13 @@ public IngestionSource getIngestionSource() {
10011026
pointerInitResetType,
10021027
pointerInitResetValue
10031028
);
1029+
1030+
final String errorStrategyString = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
1031+
IngestionErrorStrategy.ErrorStrategy errorStrategy = IngestionErrorStrategy.ErrorStrategy.valueOf(
1032+
errorStrategyString.toUpperCase(Locale.ROOT)
1033+
);
10041034
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
1005-
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
1035+
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
10061036
}
10071037
return null;
10081038
}

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.cluster.metadata;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1213
import org.opensearch.indices.pollingingest.StreamPoller;
1314

1415
import java.util.Map;
@@ -21,12 +22,19 @@
2122
public class IngestionSource {
2223
private String type;
2324
private PointerInitReset pointerInitReset;
25+
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
2426
private Map<String, Object> params;
2527

26-
public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
28+
public IngestionSource(
29+
String type,
30+
PointerInitReset pointerInitReset,
31+
IngestionErrorStrategy.ErrorStrategy errorStrategy,
32+
Map<String, Object> params
33+
) {
2734
this.type = type;
2835
this.pointerInitReset = pointerInitReset;
2936
this.params = params;
37+
this.errorStrategy = errorStrategy;
3038
}
3139

3240
public String getType() {
@@ -37,6 +45,10 @@ public PointerInitReset getPointerInitReset() {
3745
return pointerInitReset;
3846
}
3947

48+
public IngestionErrorStrategy.ErrorStrategy getErrorStrategy() {
49+
return errorStrategy;
50+
}
51+
4052
public Map<String, Object> params() {
4153
return params;
4254
}
@@ -48,17 +60,30 @@ public boolean equals(Object o) {
4860
IngestionSource ingestionSource = (IngestionSource) o;
4961
return Objects.equals(type, ingestionSource.type)
5062
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
63+
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
5164
&& Objects.equals(params, ingestionSource.params);
5265
}
5366

5467
@Override
5568
public int hashCode() {
56-
return Objects.hash(type, pointerInitReset, params);
69+
return Objects.hash(type, pointerInitReset, params, errorStrategy);
5770
}
5871

5972
@Override
6073
public String toString() {
61-
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
74+
return "IngestionSource{"
75+
+ "type='"
76+
+ type
77+
+ '\''
78+
+ ",pointer_init_reset='"
79+
+ pointerInitReset
80+
+ '\''
81+
+ ",error_strategy='"
82+
+ errorStrategy
83+
+ '\''
84+
+ ", params="
85+
+ params
86+
+ '}';
6287
}
6388

6489
/**

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
265265
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
266266
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
267267
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
268+
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,
268269

269270
// validate that built-in similarities don't get redefined
270271
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/index/IngestionShardConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ public M getMessage() {
7272
*/
7373
T nextPointer();
7474

75+
/**
76+
* @return the immediate next pointer from the provided start pointer
77+
*/
78+
T nextPointer(T startPointer);
79+
7580
/**
7681
* @return the earliest pointer in the shard
7782
*/

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.index.translog.TranslogManager;
5656
import org.opensearch.index.translog.TranslogStats;
5757
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
58+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
5859
import org.opensearch.indices.pollingingest.StreamPoller;
5960
import org.opensearch.search.suggest.completion.CompletionStats;
6061
import org.opensearch.threadpool.ThreadPool;
@@ -189,7 +190,20 @@ public void start() {
189190
}
190191

191192
String resetValue = ingestionSource.getPointerInitReset().getValue();
192-
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue);
193+
IngestionErrorStrategy ingestionErrorStrategy = IngestionErrorStrategy.create(
194+
ingestionSource.getErrorStrategy(),
195+
ingestionSource.getType()
196+
);
197+
198+
streamPoller = new DefaultStreamPoller(
199+
startPointer,
200+
persistedPointers,
201+
ingestionShardConsumer,
202+
this,
203+
resetState,
204+
resetValue,
205+
ingestionErrorStrategy
206+
);
193207
streamPoller.start();
194208
}
195209

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
/**
15+
* This error handling strategy blocks on failures preventing processing of remaining updates in the ingestion source.
16+
*/
17+
public class BlockIngestionErrorStrategy implements IngestionErrorStrategy {
18+
private static final Logger logger = LogManager.getLogger(BlockIngestionErrorStrategy.class);
19+
private final String ingestionSource;
20+
21+
public BlockIngestionErrorStrategy(String ingestionSource) {
22+
this.ingestionSource = ingestionSource;
23+
}
24+
25+
@Override
26+
public void handleError(Throwable e, ErrorStage stage) {
27+
logger.error("Error processing update from {}: {}", ingestionSource, e);
28+
29+
// todo: record blocking update and emit metrics
30+
}
31+
32+
@Override
33+
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34+
return true;
35+
}
36+
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -64,21 +64,25 @@ public class DefaultStreamPoller implements StreamPoller {
6464
@Nullable
6565
private IngestionShardPointer maxPersistedPointer;
6666

67+
private IngestionErrorStrategy errorStrategy;
68+
6769
public DefaultStreamPoller(
6870
IngestionShardPointer startPointer,
6971
Set<IngestionShardPointer> persistedPointers,
7072
IngestionShardConsumer consumer,
7173
IngestionEngine ingestionEngine,
7274
ResetState resetState,
73-
String resetValue
75+
String resetValue,
76+
IngestionErrorStrategy errorStrategy
7477
) {
7578
this(
7679
startPointer,
7780
persistedPointers,
7881
consumer,
79-
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
82+
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
8083
resetState,
81-
resetValue
84+
resetValue,
85+
errorStrategy
8286
);
8387
}
8488

@@ -88,7 +92,8 @@ public DefaultStreamPoller(
8892
IngestionShardConsumer consumer,
8993
MessageProcessorRunnable processorRunnable,
9094
ResetState resetState,
91-
String resetValue
95+
String resetValue,
96+
IngestionErrorStrategy errorStrategy
9297
) {
9398
this.consumer = Objects.requireNonNull(consumer);
9499
this.resetState = resetState;
@@ -114,6 +119,7 @@ public DefaultStreamPoller(
114119
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
115120
)
116121
);
122+
this.errorStrategy = errorStrategy;
117123
}
118124

119125
@Override
@@ -138,6 +144,9 @@ protected void startPoll() {
138144
}
139145
logger.info("Starting poller for shard {}", consumer.getShardId());
140146

147+
// track the last record successfully written to the blocking queue
148+
IngestionShardPointer lastSuccessfulPointer = null;
149+
141150
while (true) {
142151
try {
143152
if (closed) {
@@ -205,6 +214,7 @@ protected void startPoll() {
205214
continue;
206215
}
207216
blockingQueue.put(result);
217+
lastSuccessfulPointer = result.getPointer();
208218
logger.debug(
209219
"Put message {} with pointer {} to the blocking queue",
210220
String.valueOf(result.getMessage().getPayload()),
@@ -214,8 +224,18 @@ protected void startPoll() {
214224
// update the batch start pointer to the next batch
215225
batchStartPointer = consumer.nextPointer();
216226
} catch (Throwable e) {
217-
// TODO better error handling
218227
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
228+
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
229+
230+
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
231+
// Blocking error encountered. Pause poller to stop processing remaining updates.
232+
pause();
233+
} else {
234+
// Advance the batch start pointer to ignore the error and continue from next record
235+
batchStartPointer = lastSuccessfulPointer == null
236+
? consumer.nextPointer(batchStartPointer)
237+
: consumer.nextPointer(lastSuccessfulPointer);
238+
}
219239
}
220240
}
221241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
/**
15+
* This error handling strategy drops failures and proceeds with remaining updates in the ingestion source.
16+
*/
17+
public class DropIngestionErrorStrategy implements IngestionErrorStrategy {
18+
private static final Logger logger = LogManager.getLogger(DropIngestionErrorStrategy.class);
19+
private final String ingestionSource;
20+
21+
public DropIngestionErrorStrategy(String ingestionSource) {
22+
this.ingestionSource = ingestionSource;
23+
}
24+
25+
@Override
26+
public void handleError(Throwable e, ErrorStage stage) {
27+
logger.error("Error processing update from {}: {}", ingestionSource, e);
28+
29+
// todo: record failed update stats and emit metrics
30+
}
31+
32+
@Override
33+
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34+
return false;
35+
}
36+
37+
}

0 commit comments

Comments
 (0)