Skip to content

Commit d9e9944

Browse files
authored
Add allowlist setting for search-pipeline-common processors (#14562)
Add a new static setting that lets an operator choose specific search pipeline processors to enable by name. The behavior is as follows: - If the allowlist setting is not defined, all installed processors are enabled. This is the status quo. - If the allowlist setting is defined as the empty set, then all processors are disabled. - If the allowlist setting contains the names of valid processors, only those processors are enabled. - If the allowlist setting contains a name of a processor that does not exist, then the server will fail to start with an IllegalStateException listing which processors were defined in the allowlist but are not installed. - If the allowlist setting is changed between server restarts then any ingest pipeline using a now-disabled processor will fail. This is the same experience if a pipeline used a processor defined by a plugin but then that plugin were to be uninstalled across restarts. A distinct setting exists for each of request, response, and search phase results processors. Related to #14439 Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent f70fd71 commit d9e9944

File tree

4 files changed

+196
-16
lines changed

4 files changed

+196
-16
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
1111
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
1212
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
13-
- Add allowlist setting for ingest-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
13+
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
1414

1515
### Dependencies
1616
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settin
165165
// Assert that no unknown processors are defined in the allowlist
166166
final Set<String> unknownAllowlistProcessors = allowlist.stream()
167167
.filter(p -> map.containsKey(p) == false)
168-
.collect(Collectors.toSet());
168+
.collect(Collectors.toUnmodifiableSet());
169169
if (unknownAllowlistProcessors.isEmpty() == false) {
170170
throw new IllegalArgumentException(
171171
"Processor(s) "

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/SearchPipelineCommonModulePlugin.java

+88-14
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,61 @@
88

99
package org.opensearch.search.pipeline.common;
1010

11+
import org.opensearch.common.settings.Setting;
12+
import org.opensearch.common.settings.Settings;
1113
import org.opensearch.plugins.Plugin;
1214
import org.opensearch.plugins.SearchPipelinePlugin;
1315
import org.opensearch.search.pipeline.Processor;
16+
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
1417
import org.opensearch.search.pipeline.SearchRequestProcessor;
1518
import org.opensearch.search.pipeline.SearchResponseProcessor;
1619

20+
import java.util.List;
1721
import java.util.Map;
22+
import java.util.Set;
23+
import java.util.function.Function;
24+
import java.util.stream.Collectors;
1825

1926
/**
2027
* Plugin providing common search request/response processors for use in search pipelines.
2128
*/
2229
public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPipelinePlugin {
2330

31+
static final Setting<List<String>> REQUEST_PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
32+
"search.pipeline.common.request.processors.allowed",
33+
List.of(),
34+
Function.identity(),
35+
Setting.Property.NodeScope
36+
);
37+
38+
static final Setting<List<String>> RESPONSE_PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
39+
"search.pipeline.common.response.processors.allowed",
40+
List.of(),
41+
Function.identity(),
42+
Setting.Property.NodeScope
43+
);
44+
45+
static final Setting<List<String>> SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
46+
"search.pipeline.common.search.phase.results.processors.allowed",
47+
List.of(),
48+
Function.identity(),
49+
Setting.Property.NodeScope
50+
);
51+
2452
/**
2553
* No constructor needed, but build complains if we don't have a constructor with JavaDoc.
2654
*/
2755
public SearchPipelineCommonModulePlugin() {}
2856

57+
@Override
58+
public List<Setting<?>> getSettings() {
59+
return List.of(
60+
REQUEST_PROCESSORS_ALLOWLIST_SETTING,
61+
RESPONSE_PROCESSORS_ALLOWLIST_SETTING,
62+
SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING
63+
);
64+
}
65+
2966
/**
3067
* Returns a map of processor factories.
3168
*
@@ -34,25 +71,62 @@ public SearchPipelineCommonModulePlugin() {}
3471
*/
3572
@Override
3673
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Parameters parameters) {
37-
return Map.of(
38-
FilterQueryRequestProcessor.TYPE,
39-
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
40-
ScriptRequestProcessor.TYPE,
41-
new ScriptRequestProcessor.Factory(parameters.scriptService),
42-
OversampleRequestProcessor.TYPE,
43-
new OversampleRequestProcessor.Factory()
74+
return filterForAllowlistSetting(
75+
REQUEST_PROCESSORS_ALLOWLIST_SETTING,
76+
parameters.env.settings(),
77+
Map.of(
78+
FilterQueryRequestProcessor.TYPE,
79+
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
80+
ScriptRequestProcessor.TYPE,
81+
new ScriptRequestProcessor.Factory(parameters.scriptService),
82+
OversampleRequestProcessor.TYPE,
83+
new OversampleRequestProcessor.Factory()
84+
)
4485
);
4586
}
4687

4788
@Override
4889
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
49-
return Map.of(
50-
RenameFieldResponseProcessor.TYPE,
51-
new RenameFieldResponseProcessor.Factory(),
52-
TruncateHitsResponseProcessor.TYPE,
53-
new TruncateHitsResponseProcessor.Factory(),
54-
CollapseResponseProcessor.TYPE,
55-
new CollapseResponseProcessor.Factory()
90+
return filterForAllowlistSetting(
91+
RESPONSE_PROCESSORS_ALLOWLIST_SETTING,
92+
parameters.env.settings(),
93+
Map.of(
94+
RenameFieldResponseProcessor.TYPE,
95+
new RenameFieldResponseProcessor.Factory(),
96+
TruncateHitsResponseProcessor.TYPE,
97+
new TruncateHitsResponseProcessor.Factory(),
98+
CollapseResponseProcessor.TYPE,
99+
new CollapseResponseProcessor.Factory()
100+
)
56101
);
57102
}
103+
104+
@Override
105+
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(Parameters parameters) {
106+
return filterForAllowlistSetting(SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING, parameters.env.settings(), Map.of());
107+
}
108+
109+
private <T extends Processor> Map<String, Processor.Factory<T>> filterForAllowlistSetting(
110+
Setting<List<String>> allowlistSetting,
111+
Settings settings,
112+
Map<String, Processor.Factory<T>> map
113+
) {
114+
if (allowlistSetting.exists(settings) == false) {
115+
return Map.copyOf(map);
116+
}
117+
final Set<String> allowlist = Set.copyOf(allowlistSetting.get(settings));
118+
// Assert that no unknown processors are defined in the allowlist
119+
final Set<String> unknownAllowlistProcessors = allowlist.stream()
120+
.filter(p -> map.containsKey(p) == false)
121+
.collect(Collectors.toUnmodifiableSet());
122+
if (unknownAllowlistProcessors.isEmpty() == false) {
123+
throw new IllegalArgumentException(
124+
"Processor(s) " + unknownAllowlistProcessors + " were defined in [" + allowlistSetting.getKey() + "] but do not exist"
125+
);
126+
}
127+
return map.entrySet()
128+
.stream()
129+
.filter(e -> allowlist.contains(e.getKey()))
130+
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
131+
}
58132
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.pipeline.common;
10+
11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.env.TestEnvironment;
13+
import org.opensearch.plugins.SearchPipelinePlugin;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.Set;
20+
import java.util.function.BiFunction;
21+
22+
public class SearchPipelineCommonModulePluginTests extends OpenSearchTestCase {
23+
24+
public void testRequestProcessorAllowlist() throws IOException {
25+
final String key = SearchPipelineCommonModulePlugin.REQUEST_PROCESSORS_ALLOWLIST_SETTING.getKey();
26+
runAllowlistTest(key, List.of(), SearchPipelineCommonModulePlugin::getRequestProcessors);
27+
runAllowlistTest(key, List.of("filter_query"), SearchPipelineCommonModulePlugin::getRequestProcessors);
28+
runAllowlistTest(key, List.of("script"), SearchPipelineCommonModulePlugin::getRequestProcessors);
29+
runAllowlistTest(key, List.of("oversample", "script"), SearchPipelineCommonModulePlugin::getRequestProcessors);
30+
runAllowlistTest(key, List.of("filter_query", "script", "oversample"), SearchPipelineCommonModulePlugin::getRequestProcessors);
31+
32+
final IllegalArgumentException e = expectThrows(
33+
IllegalArgumentException.class,
34+
() -> runAllowlistTest(key, List.of("foo"), SearchPipelineCommonModulePlugin::getRequestProcessors)
35+
);
36+
assertTrue(e.getMessage(), e.getMessage().contains("foo"));
37+
}
38+
39+
public void testResponseProcessorAllowlist() throws IOException {
40+
final String key = SearchPipelineCommonModulePlugin.RESPONSE_PROCESSORS_ALLOWLIST_SETTING.getKey();
41+
runAllowlistTest(key, List.of(), SearchPipelineCommonModulePlugin::getResponseProcessors);
42+
runAllowlistTest(key, List.of("rename_field"), SearchPipelineCommonModulePlugin::getResponseProcessors);
43+
runAllowlistTest(key, List.of("truncate_hits"), SearchPipelineCommonModulePlugin::getResponseProcessors);
44+
runAllowlistTest(key, List.of("collapse", "truncate_hits"), SearchPipelineCommonModulePlugin::getResponseProcessors);
45+
runAllowlistTest(
46+
key,
47+
List.of("rename_field", "truncate_hits", "collapse"),
48+
SearchPipelineCommonModulePlugin::getResponseProcessors
49+
);
50+
51+
final IllegalArgumentException e = expectThrows(
52+
IllegalArgumentException.class,
53+
() -> runAllowlistTest(key, List.of("foo"), SearchPipelineCommonModulePlugin::getResponseProcessors)
54+
);
55+
assertTrue(e.getMessage(), e.getMessage().contains("foo"));
56+
}
57+
58+
public void testSearchPhaseResultsProcessorAllowlist() throws IOException {
59+
final String key = SearchPipelineCommonModulePlugin.SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING.getKey();
60+
runAllowlistTest(key, List.of(), SearchPipelineCommonModulePlugin::getSearchPhaseResultsProcessors);
61+
62+
final IllegalArgumentException e = expectThrows(
63+
IllegalArgumentException.class,
64+
() -> runAllowlistTest(key, List.of("foo"), SearchPipelineCommonModulePlugin::getSearchPhaseResultsProcessors)
65+
);
66+
assertTrue(e.getMessage(), e.getMessage().contains("foo"));
67+
}
68+
69+
private void runAllowlistTest(
70+
String settingKey,
71+
List<String> allowlist,
72+
BiFunction<SearchPipelineCommonModulePlugin, SearchPipelinePlugin.Parameters, Map<String, ?>> function
73+
) throws IOException {
74+
final Settings settings = Settings.builder().putList(settingKey, allowlist).build();
75+
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
76+
assertEquals(Set.copyOf(allowlist), function.apply(plugin, createParameters(settings)).keySet());
77+
}
78+
}
79+
80+
public void testAllowlistNotSpecified() throws IOException {
81+
final Settings settings = Settings.EMPTY;
82+
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
83+
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
84+
assertEquals(
85+
Set.of("rename_field", "truncate_hits", "collapse"),
86+
plugin.getResponseProcessors(createParameters(settings)).keySet()
87+
);
88+
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
89+
}
90+
}
91+
92+
private static SearchPipelinePlugin.Parameters createParameters(Settings settings) {
93+
return new SearchPipelinePlugin.Parameters(
94+
TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()),
95+
null,
96+
null,
97+
null,
98+
() -> 0L,
99+
(a, b) -> null,
100+
null,
101+
null,
102+
$ -> {},
103+
null
104+
);
105+
}
106+
}

0 commit comments

Comments
 (0)