Skip to content

Commit a431b14

Browse files
Make error strategy config type-safe
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 1ed98bc commit a431b14

File tree

4 files changed

+19
-26
lines changed

4 files changed

+19
-26
lines changed

CHANGELOG-3.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
2424
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
2525
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))
26+
- Added error handling support for the pull-based ingestion ([#17427](https://github.com/opensearch-project/OpenSearch/pull/17427))
2627

2728
### Dependencies
2829
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+6-24
Original file line numberDiff line numberDiff line change
@@ -771,28 +771,13 @@ public Iterator<Setting<?>> settings() {
771771
Property.Final
772772
);
773773

774-
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error.strategy";
775-
public static final Setting<String> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = Setting.simpleString(
774+
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error_strategy";
775+
public static final Setting<IngestionErrorStrategy.ErrorStrategy> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = new Setting<>(
776776
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
777777
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
778-
new Setting.Validator<>() {
779-
780-
@Override
781-
public void validate(final String value) {
782-
try {
783-
IngestionErrorStrategy.ErrorStrategy.valueOf(value.toUpperCase(Locale.ROOT));
784-
} catch (IllegalArgumentException e) {
785-
throw new IllegalArgumentException("Invalid value for " + SETTING_INGESTION_SOURCE_ERROR_STRATEGY + " [" + value + "]");
786-
}
787-
}
788-
789-
@Override
790-
public void validate(final String value, final Map<Setting<?>, Object> settings) {
791-
validate(value);
792-
}
793-
},
794-
Property.IndexScope,
795-
Property.Dynamic
778+
IngestionErrorStrategy.ErrorStrategy::parseFromString,
779+
(errorStrategy) -> {},
780+
Property.IndexScope
796781
);
797782

798783
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
@@ -1030,10 +1015,7 @@ public IngestionSource getIngestionSource() {
10301015
pointerInitResetValue
10311016
);
10321017

1033-
final String errorStrategyString = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
1034-
IngestionErrorStrategy.ErrorStrategy errorStrategy = IngestionErrorStrategy.ErrorStrategy.valueOf(
1035-
errorStrategyString.toUpperCase(Locale.ROOT)
1036-
);
1018+
final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
10371019
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
10381020
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
10391021
}

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.opensearch.index.translog.TranslogStats;
3030
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
3131
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
32-
import org.opensearch.indices.pollingingest.PollingIngestStats;
3332
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
33+
import org.opensearch.indices.pollingingest.PollingIngestStats;
3434
import org.opensearch.indices.pollingingest.StreamPoller;
3535

3636
import java.io.IOException;

server/src/main/java/org/opensearch/indices/pollingingest/IngestionErrorStrategy.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
1212

13+
import java.util.Locale;
14+
1315
/**
1416
* Defines the error handling strategy when an error is encountered either during polling records from ingestion source
1517
* or during processing the polled records.
@@ -43,7 +45,15 @@ static IngestionErrorStrategy create(ErrorStrategy errorStrategy, String ingesti
4345
@ExperimentalApi
4446
enum ErrorStrategy {
4547
DROP,
46-
BLOCK
48+
BLOCK;
49+
50+
public static ErrorStrategy parseFromString(String errorStrategy) {
51+
try {
52+
return ErrorStrategy.valueOf(errorStrategy.toUpperCase(Locale.ROOT));
53+
} catch (IllegalArgumentException e) {
54+
throw new IllegalArgumentException("Invalid ingestion errorStrategy: " + errorStrategy, e);
55+
}
56+
}
4757
}
4858

4959
/**

0 commit comments

Comments
 (0)