Skip to content

Commit 2004ba0

Browse files
authored
[Search pipelines] Add Global Ignore_failure options for Processors (#8373)
* Add Global Ingore_failure options for Processors Signed-off-by: Mingshi Liu <mingshl@amazon.com> * add changelog Signed-off-by: Mingshi Liu <mingshl@amazon.com> * Add ignore_failure to 40_rename_response Signed-off-by: Mingshi Liu <mingshl@amazon.com> * Change Boolean to boolean and refactor AbstractProcessor Signed-off-by: Mingshi Liu <mingshl@amazon.com> * rename to isIgnoreFailure and add tests Signed-off-by: Mingshi Liu <mingshl@amazon.com> * rename to isIgnoreFailure and add tests Signed-off-by: Mingshi Liu <mingshl@amazon.com> * add ignoreFailure to runSearchPhaseResultsTransformer Signed-off-by: Mingshi Liu <mingshl@amazon.com> * fix filter query and change log warn message Signed-off-by: Mingshi Liu <mingshl@amazon.com> * Add test on matching each processor stat Signed-off-by: Mingshi Liu <mingshl@amazon.com> * Add test on matching each processor stat Signed-off-by: Mingshi Liu <mingshl@amazon.com> * remove extra spaces and words Signed-off-by: Mingshi Liu <mingshl@amazon.com> * use IGNORE_FAILURE_KEY Signed-off-by: Mingshi Liu <mingshl@amazon.com> --------- Signed-off-by: Mingshi Liu <mingshl@amazon.com>
1 parent 3d7d33b commit 2004ba0

File tree

14 files changed

+440
-127
lines changed

14 files changed

+440
-127
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
101101
- Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545))
102102
- Support transport action names when registering NamedRoutes ([#7957](https://github.com/opensearch-project/OpenSearch/pull/7957))
103103
- Create concept of persistent ThreadContext headers that are unstashable ([#8291]()https://github.com/opensearch-project/OpenSearch/pull/8291)
104+
- [Search pipelines] Add Global Ignore_failure options for Processors ([#8373](https://github.com/opensearch-project/OpenSearch/pull/8373))
104105
- Enable Partial Flat Object ([#7997](https://github.com/opensearch-project/OpenSearch/pull/7997))
105106
- Add jdk.incubator.vector module support for JDK 20+ ([#8601](https://github.com/opensearch-project/OpenSearch/pull/8601))
106107

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessor.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.ingest.ConfigurationUtils;
2222
import org.opensearch.search.builder.SearchSourceBuilder;
2323
import org.opensearch.search.pipeline.Processor;
24+
import org.opensearch.search.pipeline.AbstractProcessor;
2425
import org.opensearch.search.pipeline.SearchRequestProcessor;
2526

2627
import java.io.InputStream;
@@ -53,12 +54,13 @@ public String getType() {
5354
/**
5455
* Constructor that takes a filter query.
5556
*
56-
* @param tag processor tag
57-
* @param description processor description
57+
* @param tag processor tag
58+
* @param description processor description
59+
* @param ignoreFailure option to ignore failure
5860
* @param filterQuery the query that will be added as a filter to incoming queries
5961
*/
60-
public FilterQueryRequestProcessor(String tag, String description, QueryBuilder filterQuery) {
61-
super(tag, description);
62+
FilterQueryRequestProcessor(String tag, String description, boolean ignoreFailure, QueryBuilder filterQuery) {
63+
super(tag, description, ignoreFailure);
6264
this.filterQuery = filterQuery;
6365
}
6466

@@ -101,6 +103,7 @@ public FilterQueryRequestProcessor create(
101103
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
102104
String tag,
103105
String description,
106+
boolean ignoreFailure,
104107
Map<String, Object> config,
105108
PipelineContext pipelineContext
106109
) throws Exception {
@@ -114,7 +117,7 @@ public FilterQueryRequestProcessor create(
114117
XContentParser parser = XContentType.JSON.xContent()
115118
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
116119
) {
117-
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
120+
return new FilterQueryRequestProcessor(tag, description, ignoreFailure, parseInnerQueryBuilder(parser));
118121
}
119122
}
120123
}

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessor.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.ingest.ConfigurationUtils;
2020
import org.opensearch.search.SearchHit;
2121
import org.opensearch.search.pipeline.Processor;
22+
import org.opensearch.search.pipeline.AbstractProcessor;
2223
import org.opensearch.search.pipeline.SearchRequestProcessor;
2324
import org.opensearch.search.pipeline.SearchResponseProcessor;
2425

@@ -41,14 +42,22 @@ public class RenameFieldResponseProcessor extends AbstractProcessor implements S
4142
/**
4243
* Constructor that takes a target field to rename and the new name
4344
*
44-
* @param tag processor tag
45-
* @param description processor description
46-
* @param oldField name of field to be renamed
47-
* @param newField name of field that will replace the old field
45+
* @param tag processor tag
46+
* @param description processor description
47+
* @param ignoreFailure option to ignore failure
48+
* @param oldField name of field to be renamed
49+
* @param newField name of field that will replace the old field
4850
* @param ignoreMissing if true, do not throw error if oldField does not exist within search response
4951
*/
50-
public RenameFieldResponseProcessor(String tag, String description, String oldField, String newField, boolean ignoreMissing) {
51-
super(tag, description);
52+
public RenameFieldResponseProcessor(
53+
String tag,
54+
String description,
55+
boolean ignoreFailure,
56+
String oldField,
57+
String newField,
58+
boolean ignoreMissing
59+
) {
60+
super(tag, description, ignoreFailure);
5261
this.oldField = oldField;
5362
this.newField = newField;
5463
this.ignoreMissing = ignoreMissing;
@@ -140,13 +149,14 @@ public RenameFieldResponseProcessor create(
140149
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
141150
String tag,
142151
String description,
152+
boolean ignoreFailure,
143153
Map<String, Object> config,
144154
PipelineContext pipelineContext
145155
) throws Exception {
146156
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
147157
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
148158
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
149-
return new RenameFieldResponseProcessor(tag, description, oldField, newField, ignoreMissing);
159+
return new RenameFieldResponseProcessor(tag, description, ignoreFailure, oldField, newField, ignoreMissing);
150160
}
151161
}
152162
}

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.opensearch.script.ScriptType;
2626
import org.opensearch.script.SearchScript;
2727
import org.opensearch.search.pipeline.Processor;
28+
import org.opensearch.search.pipeline.AbstractProcessor;
2829
import org.opensearch.search.pipeline.SearchRequestProcessor;
2930
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;
3031

@@ -54,18 +55,20 @@ public final class ScriptRequestProcessor extends AbstractProcessor implements S
5455
*
5556
* @param tag The processor's tag.
5657
* @param description The processor's description.
58+
* @param ignoreFailure The option to ignore failure
5759
* @param script The {@link Script} to execute.
5860
* @param precompiledSearchScript The {@link Script} precompiled
5961
* @param scriptService The {@link ScriptService} used to execute the script.
6062
*/
6163
ScriptRequestProcessor(
6264
String tag,
6365
String description,
66+
boolean ignoreFailure,
6467
Script script,
6568
@Nullable SearchScript precompiledSearchScript,
6669
ScriptService scriptService
6770
) {
68-
super(tag, description);
71+
super(tag, description, ignoreFailure);
6972
this.script = script;
7073
this.precompiledSearchScript = precompiledSearchScript;
7174
this.scriptService = scriptService;
@@ -146,6 +149,7 @@ public ScriptRequestProcessor create(
146149
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
147150
String processorTag,
148151
String description,
152+
boolean ignoreFailure,
149153
Map<String, Object> config,
150154
PipelineContext pipelineContext
151155
) throws Exception {
@@ -174,7 +178,7 @@ public ScriptRequestProcessor create(
174178
} catch (ScriptException e) {
175179
throw newConfigurationException(TYPE, processorTag, null, e);
176180
}
177-
return new ScriptRequestProcessor(processorTag, description, script, searchScript, scriptService);
181+
return new ScriptRequestProcessor(processorTag, description, ignoreFailure, script, searchScript, scriptService);
178182
}
179183
}
180184
}

modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/FilterQueryRequestProcessorTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {
2323

2424
public void testFilterQuery() throws Exception {
2525
QueryBuilder filterQuery = new TermQueryBuilder("field", "value");
26-
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, filterQuery);
26+
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, false, filterQuery);
2727
QueryBuilder incomingQuery = new TermQueryBuilder("text", "foo");
2828
SearchSourceBuilder source = new SearchSourceBuilder().query(incomingQuery);
2929
SearchRequest request = new SearchRequest().source(source);
@@ -39,13 +39,13 @@ public void testFilterQuery() throws Exception {
3939
public void testFactory() throws Exception {
4040
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
4141
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
42-
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null);
42+
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, false, configMap, null);
4343
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);
4444

4545
// Missing "query" parameter:
4646
expectThrows(
4747
IllegalArgumentException.class,
48-
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
48+
() -> factory.create(Collections.emptyMap(), null, null, false, Collections.emptyMap(), null)
4949
);
5050
}
5151
}

modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/RenameFieldResponseProcessorTests.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void testRenameResponse() throws Exception {
5858
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
5959
null,
6060
null,
61+
false,
6162
"field 0",
6263
"new field",
6364
false
@@ -74,6 +75,7 @@ public void testRenameResponseWithMapping() throws Exception {
7475
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
7576
null,
7677
null,
78+
false,
7779
"field 0",
7880
"new field",
7981
true
@@ -97,6 +99,7 @@ public void testMissingField() throws Exception {
9799
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
98100
null,
99101
null,
102+
false,
100103
"field",
101104
"new field",
102105
false
@@ -115,15 +118,15 @@ public void testFactory() throws Exception {
115118
config.put("target_field", newField);
116119

117120
RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
118-
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null);
121+
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, false, config, null);
119122
assertEquals(processor.getType(), "rename_field");
120123
assertEquals(processor.getOldField(), oldField);
121124
assertEquals(processor.getNewField(), newField);
122125
assertFalse(processor.isIgnoreMissing());
123126

124127
expectThrows(
125128
OpenSearchParseException.class,
126-
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
129+
() -> factory.create(Collections.emptyMap(), null, null, false, Collections.emptyMap(), null)
127130
);
128131
}
129132
}

modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/ScriptRequestProcessorTests.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void setupScripting() {
8282
}
8383

8484
public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
85-
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, script, null, scriptService);
85+
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, false, script, null, scriptService);
8686
SearchRequest searchRequest = new SearchRequest();
8787
searchRequest.source(createSearchSourceBuilder());
8888

@@ -92,7 +92,14 @@ public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
9292
}
9393

9494
public void testScriptingWithPrecompiledIngestScript() throws Exception {
95-
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, script, searchScript, scriptService);
95+
ScriptRequestProcessor processor = new ScriptRequestProcessor(
96+
randomAlphaOfLength(10),
97+
null,
98+
false,
99+
script,
100+
searchScript,
101+
scriptService
102+
);
96103
SearchRequest searchRequest = new SearchRequest();
97104
searchRequest.source(createSearchSourceBuilder());
98105

modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/40_rename_response.yml

+31-2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ teardown:
6363
}
6464
- match: { acknowledged: true }
6565

66+
- do:
67+
search_pipeline.put:
68+
id: "my_pipeline_4"
69+
body: >
70+
{
71+
"description": "test pipeline with ignore missing false and ignore failure true",
72+
"response_processors": [
73+
{
74+
"rename_field":
75+
{
76+
"field": "aa",
77+
"target_field": "b",
78+
"ignore_missing": false,
79+
"ignore_failure": true
80+
}
81+
}
82+
]
83+
}
84+
- match: { acknowledged: true }
85+
6686
- do:
6787
indices.create:
6888
index: test
@@ -119,15 +139,24 @@ teardown:
119139
- match: { hits.total.value: 1 }
120140
- match: {hits.hits.0._source: { "a": "foo" } }
121141

122-
# Pipeline with ignore_missing set to true
123-
# Should still pass even though index does not contain field
142+
# Pipeline with ignore_missing set to false
143+
# Should throw illegal_argument_exception
124144
- do:
125145
catch: bad_request
126146
search:
127147
index: test
128148
search_pipeline: "my_pipeline_3"
129149
- match: { error.type: "illegal_argument_exception" }
130150

151+
# Pipeline with ignore_missing set to false and ignore_failure set to true
152+
# Should return while catching error
153+
- do:
154+
search:
155+
index: test
156+
search_pipeline: "my_pipeline_4"
157+
- match: { hits.total.value: 1 }
158+
- match: {hits.hits.0._source: { "a": "foo" } }
159+
131160
# No source, using stored_fields
132161
- do:
133162
search:

server/src/main/java/org/opensearch/ingest/ConfigurationUtils.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public final class ConfigurationUtils {
6767

6868
public static final String TAG_KEY = "tag";
6969
public static final String DESCRIPTION_KEY = "description";
70+
public static final String IGNORE_FAILURE_KEY = "ignore_failure";
7071

7172
private ConfigurationUtils() {}
7273

@@ -194,7 +195,7 @@ public static String readOptionalStringOrIntProperty(
194195
return readStringOrInt(processorType, processorTag, propertyName, value);
195196
}
196197

197-
public static Boolean readBooleanProperty(
198+
public static boolean readBooleanProperty(
198199
String processorType,
199200
String processorTag,
200201
Map<String, Object> configuration,
@@ -214,7 +215,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St
214215
return null;
215216
}
216217
if (value instanceof Boolean) {
217-
return (Boolean) value;
218+
return (boolean) value;
218219
}
219220
throw newConfigurationException(
220221
processorType,
@@ -530,10 +531,11 @@ public static Processor readProcessor(
530531
) throws Exception {
531532
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
532533
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
534+
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, IGNORE_FAILURE_KEY, false);
533535
Script conditionalScript = extractConditional(config);
534536
Processor.Factory factory = processorFactories.get(type);
537+
535538
if (factory != null) {
536-
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
537539
List<Map<String, Object>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(
538540
null,
539541
null,
+10-5
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.search.pipeline.common;
10-
11-
import org.opensearch.search.pipeline.Processor;
9+
package org.opensearch.search.pipeline;
1210

1311
/**
1412
* Base class for common processor behavior.
1513
*/
16-
abstract class AbstractProcessor implements Processor {
14+
public abstract class AbstractProcessor implements Processor {
1715
private final String tag;
1816
private final String description;
17+
private final boolean ignoreFailure;
1918

20-
protected AbstractProcessor(String tag, String description) {
19+
protected AbstractProcessor(String tag, String description, boolean ignoreFailure) {
2120
this.tag = tag;
2221
this.description = description;
22+
this.ignoreFailure = ignoreFailure;
2323
}
2424

2525
@Override
@@ -31,4 +31,9 @@ public String getTag() {
3131
public String getDescription() {
3232
return description;
3333
}
34+
35+
@Override
36+
public boolean isIgnoreFailure() {
37+
return ignoreFailure;
38+
}
3439
}

0 commit comments

Comments
 (0)