Skip to content

Commit d0168ee

Browse files
Add error handling strategy to pull-based ingestion
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 8447737 commit d0168ee

File tree

14 files changed

+391
-28
lines changed

14 files changed

+391
-28
lines changed

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ public KafkaOffset nextPointer() {
123123
return new KafkaOffset(lastFetchedOffset + 1);
124124
}
125125

126+
@Override
127+
public KafkaOffset nextPointer(KafkaOffset pointer) {
128+
return new KafkaOffset(pointer.getOffset() + 1);
129+
}
130+
126131
@Override
127132
public IngestionShardPointer earliestPointer() {
128133
long startOffset = AccessController.doPrivileged(

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.opensearch.index.IndexModule;
7272
import org.opensearch.index.mapper.MapperService;
7373
import org.opensearch.index.seqno.SequenceNumbers;
74+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
7475
import org.opensearch.indices.pollingingest.StreamPoller;
7576
import org.opensearch.indices.replication.SegmentReplicationSource;
7677
import org.opensearch.indices.replication.common.ReplicationType;
@@ -731,6 +732,30 @@ public void validate(final String value, final Map<Setting<?>, Object> settings)
731732
Property.Dynamic
732733
);
733734

735+
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error.strategy";
736+
public static final Setting<String> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = Setting.simpleString(
737+
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
738+
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
739+
new Setting.Validator<>() {
740+
741+
@Override
742+
public void validate(final String value) {
743+
try {
744+
IngestionErrorStrategy.ErrorStrategy.valueOf(value.toUpperCase(Locale.ROOT));
745+
} catch (IllegalArgumentException e) {
746+
throw new IllegalArgumentException("Invalid value for " + SETTING_INGESTION_SOURCE_ERROR_STRATEGY + " [" + value + "]");
747+
}
748+
}
749+
750+
@Override
751+
public void validate(final String value, final Map<Setting<?>, Object> settings) {
752+
validate(value);
753+
}
754+
},
755+
Property.IndexScope,
756+
Property.Dynamic
757+
);
758+
734759
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
735760
"index.ingestion_source.param.",
736761
key -> new Setting<>(key, "", (value) -> {
@@ -955,8 +980,12 @@ public IngestionSource getIngestionSource() {
955980
final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings);
956981
if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) {
957982
final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings);
983+
final String errorStrategyString = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
984+
IngestionErrorStrategy.ErrorStrategy errorStrategy = IngestionErrorStrategy.ErrorStrategy.valueOf(
985+
errorStrategyString.toUpperCase(Locale.ROOT)
986+
);
958987
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
959-
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
988+
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
960989
}
961990
return null;
962991
}

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.cluster.metadata;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1213

1314
import java.util.Map;
1415
import java.util.Objects;
@@ -20,12 +21,19 @@
2021
public class IngestionSource {
2122
private String type;
2223
private String pointerInitReset;
24+
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
2325
private Map<String, Object> params;
2426

25-
public IngestionSource(String type, String pointerInitReset, Map<String, Object> params) {
27+
public IngestionSource(
28+
String type,
29+
String pointerInitReset,
30+
IngestionErrorStrategy.ErrorStrategy errorStrategy,
31+
Map<String, Object> params
32+
) {
2633
this.type = type;
2734
this.pointerInitReset = pointerInitReset;
2835
this.params = params;
36+
this.errorStrategy = errorStrategy;
2937
}
3038

3139
public String getType() {
@@ -36,6 +44,10 @@ public String getPointerInitReset() {
3644
return pointerInitReset;
3745
}
3846

47+
public IngestionErrorStrategy.ErrorStrategy getErrorStrategy() {
48+
return errorStrategy;
49+
}
50+
3951
public Map<String, Object> params() {
4052
return params;
4153
}
@@ -47,16 +59,29 @@ public boolean equals(Object o) {
4759
IngestionSource ingestionSource = (IngestionSource) o;
4860
return Objects.equals(type, ingestionSource.type)
4961
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
62+
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
5063
&& Objects.equals(params, ingestionSource.params);
5164
}
5265

5366
@Override
5467
public int hashCode() {
55-
return Objects.hash(type, pointerInitReset, params);
68+
return Objects.hash(type, pointerInitReset, params, errorStrategy);
5669
}
5770

5871
@Override
5972
public String toString() {
60-
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
73+
return "IngestionSource{"
74+
+ "type='"
75+
+ type
76+
+ '\''
77+
+ ",pointer_init_reset='"
78+
+ pointerInitReset
79+
+ '\''
80+
+ ",error_strategy='"
81+
+ errorStrategy
82+
+ '\''
83+
+ ", params="
84+
+ params
85+
+ '}';
6186
}
6287
}

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
264264
IndexMetadata.INGESTION_SOURCE_TYPE_SETTING,
265265
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
266266
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
267+
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,
267268

268269
// validate that built-in similarities don't get redefined
269270
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/index/IngestionShardConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ public M getMessage() {
7272
*/
7373
T nextPointer();
7474

75+
/**
76+
* @return the immediate next pointer from the provided start pointer
77+
*/
78+
T nextPointer(T startPointer);
79+
7580
/**
7681
* @return the earliest pointer in the shard
7782
*/

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.index.translog.TranslogManager;
5656
import org.opensearch.index.translog.TranslogStats;
5757
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
58+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
5859
import org.opensearch.indices.pollingingest.StreamPoller;
5960
import org.opensearch.search.suggest.completion.CompletionStats;
6061
import org.opensearch.threadpool.ThreadPool;
@@ -191,7 +192,18 @@ public void start() {
191192
resetState = StreamPoller.ResetState.NONE;
192193
}
193194

194-
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState);
195+
IngestionErrorStrategy ingestionErrorStrategy = IngestionErrorStrategy.create(
196+
ingestionSource.getErrorStrategy(),
197+
ingestionSource.getType()
198+
);
199+
streamPoller = new DefaultStreamPoller(
200+
startPointer,
201+
persistedPointers,
202+
ingestionShardConsumer,
203+
this,
204+
resetState,
205+
ingestionErrorStrategy
206+
);
195207
streamPoller.start();
196208
}
197209

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
/**
15+
* This error handling strategy blocks on failures preventing processing of remaining updates in the ingestion source.
16+
*/
17+
public class BlockIngestionErrorStrategy implements IngestionErrorStrategy {
18+
private static final Logger logger = LogManager.getLogger(BlockIngestionErrorStrategy.class);
19+
private final String ingestionSource;
20+
21+
public BlockIngestionErrorStrategy(String ingestionSource) {
22+
this.ingestionSource = ingestionSource;
23+
}
24+
25+
@Override
26+
public void handleError(Throwable e, ErrorStage stage) {
27+
logger.error("Error processing update from {}: {}", ingestionSource, e);
28+
29+
// todo: record blocking update and emit metrics
30+
}
31+
32+
@Override
33+
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34+
return true;
35+
}
36+
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,23 @@ public class DefaultStreamPoller implements StreamPoller {
6363
@Nullable
6464
private IngestionShardPointer maxPersistedPointer;
6565

66+
private IngestionErrorStrategy errorStrategy;
67+
6668
public DefaultStreamPoller(
6769
IngestionShardPointer startPointer,
6870
Set<IngestionShardPointer> persistedPointers,
6971
IngestionShardConsumer consumer,
7072
IngestionEngine ingestionEngine,
71-
ResetState resetState
73+
ResetState resetState,
74+
IngestionErrorStrategy errorStrategy
7275
) {
7376
this(
7477
startPointer,
7578
persistedPointers,
7679
consumer,
77-
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
78-
resetState
80+
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
81+
resetState,
82+
errorStrategy
7983
);
8084
}
8185

@@ -84,7 +88,8 @@ public DefaultStreamPoller(
8488
Set<IngestionShardPointer> persistedPointers,
8589
IngestionShardConsumer consumer,
8690
MessageProcessorRunnable processorRunnable,
87-
ResetState resetState
91+
ResetState resetState,
92+
IngestionErrorStrategy errorStrategy
8893
) {
8994
this.consumer = Objects.requireNonNull(consumer);
9095
this.resetState = resetState;
@@ -109,6 +114,7 @@ public DefaultStreamPoller(
109114
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
110115
)
111116
);
117+
this.errorStrategy = errorStrategy;
112118
}
113119

114120
@Override
@@ -133,6 +139,9 @@ protected void startPoll() {
133139
}
134140
logger.info("Starting poller for shard {}", consumer.getShardId());
135141

142+
// track the last record successfully written to the blocking queue
143+
IngestionShardPointer lastSuccessfulPointer = null;
144+
136145
while (true) {
137146
try {
138147
if (closed) {
@@ -188,6 +197,7 @@ protected void startPoll() {
188197
continue;
189198
}
190199
blockingQueue.put(result);
200+
lastSuccessfulPointer = result.getPointer();
191201
logger.debug(
192202
"Put message {} with pointer {} to the blocking queue",
193203
String.valueOf(result.getMessage().getPayload()),
@@ -197,8 +207,18 @@ protected void startPoll() {
197207
// update the batch start pointer to the next batch
198208
batchStartPointer = consumer.nextPointer();
199209
} catch (Throwable e) {
200-
// TODO better error handling
201210
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
211+
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
212+
213+
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
214+
// Blocking error encountered. Pause poller to stop processing remaining updates.
215+
pause();
216+
} else {
217+
// Advance the batch start pointer to ignore the error and continue from next record
218+
batchStartPointer = lastSuccessfulPointer == null
219+
? consumer.nextPointer(batchStartPointer)
220+
: consumer.nextPointer(lastSuccessfulPointer);
221+
}
202222
}
203223
}
204224
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
/**
15+
* This error handling strategy drops failures and proceeds with remaining updates in the ingestion source.
16+
*/
17+
public class DropIngestionErrorStrategy implements IngestionErrorStrategy {
18+
private static final Logger logger = LogManager.getLogger(DropIngestionErrorStrategy.class);
19+
private final String ingestionSource;
20+
21+
public DropIngestionErrorStrategy(String ingestionSource) {
22+
this.ingestionSource = ingestionSource;
23+
}
24+
25+
@Override
26+
public void handleError(Throwable e, ErrorStage stage) {
27+
logger.error("Error processing update from {}: {}", ingestionSource, e);
28+
29+
// todo: record failed update stats and emit metrics
30+
}
31+
32+
@Override
33+
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34+
return false;
35+
}
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
13+
/**
14+
* Defines the error handling strategy when an error is encountered either during polling records from ingestion source
15+
* or during processing the polled records.
16+
*/
17+
@ExperimentalApi
18+
public interface IngestionErrorStrategy {
19+
20+
/**
21+
* Process and record the error.
22+
*/
23+
void handleError(Throwable e, ErrorStage stage);
24+
25+
/**
26+
* Indicates if ingestion must be paused, blocking further writes.
27+
*/
28+
boolean shouldPauseIngestion(Throwable e, ErrorStage stage);
29+
30+
static IngestionErrorStrategy create(ErrorStrategy errorStrategy, String ingestionSource) {
31+
switch (errorStrategy) {
32+
case BLOCK:
33+
return new BlockIngestionErrorStrategy(ingestionSource);
34+
case DROP:
35+
default:
36+
return new DropIngestionErrorStrategy(ingestionSource);
37+
}
38+
}
39+
40+
// Indicates available error handling strategies
41+
@ExperimentalApi
42+
enum ErrorStrategy {
43+
DROP,
44+
BLOCK
45+
}
46+
47+
// Indicates different stages of encountered errors
48+
@ExperimentalApi
49+
enum ErrorStage {
50+
POLLING,
51+
PROCESSING
52+
}
53+
54+
}

0 commit comments

Comments
 (0)