Skip to content

Commit 155e52f

Browse files
varunbharadwajMayank Sharma
authored and
Mayank Sharma
committed
[Pull-based Ingestion] Add error handling strategy to pull-based ingestion (opensearch-project#17427)
* Add error handling strategy to pull-based ingestion Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com> * Make error strategy config type-safe Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com> --------- Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 98dc223 commit 155e52f

File tree

15 files changed

+398
-37
lines changed

15 files changed

+398
-37
lines changed

CHANGELOG-3.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2323
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
2424
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
2525
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))
26+
- Added error handling support for the pull-based ingestion ([#17427](https://github.com/opensearch-project/OpenSearch/pull/17427))
2627

2728
### Dependencies
2829
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public KafkaOffset nextPointer() {
132132
return new KafkaOffset(lastFetchedOffset + 1);
133133
}
134134

135+
@Override
136+
public KafkaOffset nextPointer(KafkaOffset pointer) {
137+
return new KafkaOffset(pointer.getOffset() + 1);
138+
}
139+
135140
@Override
136141
public IngestionShardPointer earliestPointer() {
137142
long startOffset = AccessController.doPrivileged(

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.opensearch.index.IndexModule;
7272
import org.opensearch.index.mapper.MapperService;
7373
import org.opensearch.index.seqno.SequenceNumbers;
74+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
7475
import org.opensearch.indices.pollingingest.StreamPoller;
7576
import org.opensearch.indices.replication.SegmentReplicationSource;
7677
import org.opensearch.indices.replication.common.ReplicationType;
@@ -770,6 +771,15 @@ public Iterator<Setting<?>> settings() {
770771
Property.Final
771772
);
772773

774+
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error_strategy";
775+
public static final Setting<IngestionErrorStrategy.ErrorStrategy> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = new Setting<>(
776+
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
777+
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
778+
IngestionErrorStrategy.ErrorStrategy::parseFromString,
779+
(errorStrategy) -> {},
780+
Property.IndexScope
781+
);
782+
773783
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
774784
"index.ingestion_source.param.",
775785
key -> new Setting<>(key, "", (value) -> {
@@ -1004,8 +1014,10 @@ public IngestionSource getIngestionSource() {
10041014
pointerInitResetType,
10051015
pointerInitResetValue
10061016
);
1017+
1018+
final IngestionErrorStrategy.ErrorStrategy errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
10071019
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
1008-
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
1020+
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
10091021
}
10101022
return null;
10111023
}

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.cluster.metadata;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
1213
import org.opensearch.indices.pollingingest.StreamPoller;
1314

1415
import java.util.Map;
@@ -21,12 +22,19 @@
2122
public class IngestionSource {
2223
private String type;
2324
private PointerInitReset pointerInitReset;
25+
private IngestionErrorStrategy.ErrorStrategy errorStrategy;
2426
private Map<String, Object> params;
2527

26-
public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
28+
public IngestionSource(
29+
String type,
30+
PointerInitReset pointerInitReset,
31+
IngestionErrorStrategy.ErrorStrategy errorStrategy,
32+
Map<String, Object> params
33+
) {
2734
this.type = type;
2835
this.pointerInitReset = pointerInitReset;
2936
this.params = params;
37+
this.errorStrategy = errorStrategy;
3038
}
3139

3240
public String getType() {
@@ -37,6 +45,10 @@ public PointerInitReset getPointerInitReset() {
3745
return pointerInitReset;
3846
}
3947

48+
public IngestionErrorStrategy.ErrorStrategy getErrorStrategy() {
49+
return errorStrategy;
50+
}
51+
4052
public Map<String, Object> params() {
4153
return params;
4254
}
@@ -48,17 +60,30 @@ public boolean equals(Object o) {
4860
IngestionSource ingestionSource = (IngestionSource) o;
4961
return Objects.equals(type, ingestionSource.type)
5062
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
63+
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
5164
&& Objects.equals(params, ingestionSource.params);
5265
}
5366

5467
@Override
5568
public int hashCode() {
56-
return Objects.hash(type, pointerInitReset, params);
69+
return Objects.hash(type, pointerInitReset, params, errorStrategy);
5770
}
5871

5972
@Override
6073
public String toString() {
61-
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
74+
return "IngestionSource{"
75+
+ "type='"
76+
+ type
77+
+ '\''
78+
+ ",pointer_init_reset='"
79+
+ pointerInitReset
80+
+ '\''
81+
+ ",error_strategy='"
82+
+ errorStrategy
83+
+ '\''
84+
+ ", params="
85+
+ params
86+
+ '}';
6287
}
6388

6489
/**

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
266266
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
267267
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING,
268268
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
269+
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,
269270

270271
// validate that built-in similarities don't get redefined
271272
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/opensearch/index/IngestionShardConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ public M getMessage() {
7272
*/
7373
T nextPointer();
7474

75+
/**
76+
* @return the immediate next pointer from the provided start pointer
77+
*/
78+
T nextPointer(T startPointer);
79+
7580
/**
7681
* @return the earliest pointer in the shard
7782
*/

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.index.translog.TranslogStats;
3030
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
3131
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
32+
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
3233
import org.opensearch.indices.pollingingest.PollingIngestStats;
3334
import org.opensearch.indices.pollingingest.StreamPoller;
3435

@@ -99,12 +100,21 @@ public void start() {
99100
}
100101

101102
String resetValue = ingestionSource.getPointerInitReset().getValue();
102-
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState, resetValue);
103+
IngestionErrorStrategy ingestionErrorStrategy = IngestionErrorStrategy.create(
104+
ingestionSource.getErrorStrategy(),
105+
ingestionSource.getType()
106+
);
103107

104-
// Poller is only started on the primary shard. Replica shards will rely on segment replication.
105-
if (!engineConfig.isReadOnlyReplica()) {
106-
streamPoller.start();
107-
}
108+
streamPoller = new DefaultStreamPoller(
109+
startPointer,
110+
persistedPointers,
111+
ingestionShardConsumer,
112+
this,
113+
resetState,
114+
resetValue,
115+
ingestionErrorStrategy
116+
);
117+
streamPoller.start();
108118
}
109119

110120
protected Set<IngestionShardPointer> fetchPersistedOffsets(DirectoryReader directoryReader, IngestionShardPointer batchStart)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
/**
15+
* This error handling strategy blocks on failures preventing processing of remaining updates in the ingestion source.
16+
*/
17+
public class BlockIngestionErrorStrategy implements IngestionErrorStrategy {
18+
private static final Logger logger = LogManager.getLogger(BlockIngestionErrorStrategy.class);
19+
private final String ingestionSource;
20+
21+
public BlockIngestionErrorStrategy(String ingestionSource) {
22+
this.ingestionSource = ingestionSource;
23+
}
24+
25+
@Override
26+
public void handleError(Throwable e, ErrorStage stage) {
27+
logger.error("Error processing update from {}: {}", ingestionSource, e);
28+
29+
// todo: record blocking update and emit metrics
30+
}
31+
32+
@Override
33+
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34+
return true;
35+
}
36+
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,25 @@ public class DefaultStreamPoller implements StreamPoller {
6767
@Nullable
6868
private IngestionShardPointer maxPersistedPointer;
6969

70+
private IngestionErrorStrategy errorStrategy;
71+
7072
public DefaultStreamPoller(
7173
IngestionShardPointer startPointer,
7274
Set<IngestionShardPointer> persistedPointers,
7375
IngestionShardConsumer consumer,
7476
IngestionEngine ingestionEngine,
7577
ResetState resetState,
76-
String resetValue
78+
String resetValue,
79+
IngestionErrorStrategy errorStrategy
7780
) {
7881
this(
7982
startPointer,
8083
persistedPointers,
8184
consumer,
82-
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
85+
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
8386
resetState,
84-
resetValue
87+
resetValue,
88+
errorStrategy
8589
);
8690
}
8791

@@ -91,7 +95,8 @@ public DefaultStreamPoller(
9195
IngestionShardConsumer consumer,
9296
MessageProcessorRunnable processorRunnable,
9397
ResetState resetState,
94-
String resetValue
98+
String resetValue,
99+
IngestionErrorStrategy errorStrategy
95100
) {
96101
this.consumer = Objects.requireNonNull(consumer);
97102
this.resetState = resetState;
@@ -117,6 +122,7 @@ public DefaultStreamPoller(
117122
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
118123
)
119124
);
125+
this.errorStrategy = errorStrategy;
120126
}
121127

122128
@Override
@@ -141,6 +147,9 @@ protected void startPoll() {
141147
}
142148
logger.info("Starting poller for shard {}", consumer.getShardId());
143149

150+
// track the last record successfully written to the blocking queue
151+
IngestionShardPointer lastSuccessfulPointer = null;
152+
144153
while (true) {
145154
try {
146155
if (closed) {
@@ -209,6 +218,7 @@ protected void startPoll() {
209218
}
210219
totalPolledCount.inc();
211220
blockingQueue.put(result);
221+
lastSuccessfulPointer = result.getPointer();
212222
logger.debug(
213223
"Put message {} with pointer {} to the blocking queue",
214224
String.valueOf(result.getMessage().getPayload()),
@@ -218,8 +228,18 @@ protected void startPoll() {
218228
// update the batch start pointer to the next batch
219229
batchStartPointer = consumer.nextPointer();
220230
} catch (Throwable e) {
221-
// TODO better error handling
222231
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
232+
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
233+
234+
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
235+
// Blocking error encountered. Pause poller to stop processing remaining updates.
236+
pause();
237+
} else {
238+
// Advance the batch start pointer to ignore the error and continue from next record
239+
batchStartPointer = lastSuccessfulPointer == null
240+
? consumer.nextPointer(batchStartPointer)
241+
: consumer.nextPointer(lastSuccessfulPointer);
242+
}
223243
}
224244
}
225245
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.indices.pollingingest;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
14+
/**
15+
* This error handling strategy drops failures and proceeds with remaining updates in the ingestion source.
16+
*/
17+
public class DropIngestionErrorStrategy implements IngestionErrorStrategy {
18+
private static final Logger logger = LogManager.getLogger(DropIngestionErrorStrategy.class);
19+
private final String ingestionSource;
20+
21+
public DropIngestionErrorStrategy(String ingestionSource) {
22+
this.ingestionSource = ingestionSource;
23+
}
24+
25+
@Override
26+
public void handleError(Throwable e, ErrorStage stage) {
27+
logger.error("Error processing update from {}: {}", ingestionSource, e);
28+
29+
// todo: record failed update stats and emit metrics
30+
}
31+
32+
@Override
33+
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34+
return false;
35+
}
36+
37+
}

0 commit comments

Comments
 (0)