Skip to content

Commit a99b494

Browse files
authored
Add allowlist setting for ingest-common processors (#14479)
Add a new static setting that lets an operator choose specific ingest processors to enable by name. The behavior is as follows: - If the allowlist setting is not defined, all installed processors are enabled. This is the status quo. - If the allowlist setting is defined as the empty set, then all processors are disabled. - If the allowlist setting contains the names of valid processors, only those processors are enabled. - If the allowlist setting contains a name of a processor that does not exist, then the server will fail to start with an IllegalStateException listing which processors were defined in the allowlist but are not installed. - If the allowlist setting is changed between server restarts then any ingest pipeline using a now-disabled processor will fail. This is the same experience if a pipeline used a processor defined by a plugin but then that plugin were to be uninstalled across restarts. Related to #14439 Signed-off-by: Andrew Ross <andrross@amazon.com>
1 parent 729276f commit a99b494

File tree

3 files changed

+146
-3
lines changed

3 files changed

+146
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1010
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
1111
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))
1212
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
13+
- Add allowlist setting for ingest-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
1314

1415
### Dependencies
1516
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

+36-3
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,20 @@
5858
import java.util.HashMap;
5959
import java.util.List;
6060
import java.util.Map;
61+
import java.util.Set;
62+
import java.util.function.Function;
6163
import java.util.function.Supplier;
64+
import java.util.stream.Collectors;
6265

6366
public class IngestCommonModulePlugin extends Plugin implements ActionPlugin, IngestPlugin {
6467

68+
static final Setting<List<String>> PROCESSORS_ALLOWLIST_SETTING = Setting.listSetting(
69+
"ingest.common.processors.allowed",
70+
List.of(),
71+
Function.identity(),
72+
Setting.Property.NodeScope
73+
);
74+
6575
static final Setting<TimeValue> WATCHDOG_INTERVAL = Setting.timeSetting(
6676
"ingest.grok.watchdog.interval",
6777
TimeValue.timeValueSeconds(1),
@@ -77,7 +87,7 @@ public IngestCommonModulePlugin() {}
7787

7888
@Override
7989
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
80-
Map<String, Processor.Factory> processors = new HashMap<>();
90+
final Map<String, Processor.Factory> processors = new HashMap<>();
8191
processors.put(DateProcessor.TYPE, new DateProcessor.Factory(parameters.scriptService));
8292
processors.put(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService));
8393
processors.put(AppendProcessor.TYPE, new AppendProcessor.Factory(parameters.scriptService));
@@ -110,7 +120,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
110120
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
111121
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
112122
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
113-
return Collections.unmodifiableMap(processors);
123+
return filterForAllowlistSetting(parameters.env.settings(), processors);
114124
}
115125

116126
@Override
@@ -133,7 +143,7 @@ public List<RestHandler> getRestHandlers(
133143

134144
@Override
135145
public List<Setting<?>> getSettings() {
136-
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME);
146+
return Arrays.asList(WATCHDOG_INTERVAL, WATCHDOG_MAX_EXECUTION_TIME, PROCESSORS_ALLOWLIST_SETTING);
137147
}
138148

139149
private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters parameters) {
@@ -147,4 +157,27 @@ private static MatcherWatchdog createGrokThreadWatchdog(Processor.Parameters par
147157
);
148158
}
149159

160+
private Map<String, Processor.Factory> filterForAllowlistSetting(Settings settings, Map<String, Processor.Factory> map) {
161+
if (PROCESSORS_ALLOWLIST_SETTING.exists(settings) == false) {
162+
return Map.copyOf(map);
163+
}
164+
final Set<String> allowlist = Set.copyOf(PROCESSORS_ALLOWLIST_SETTING.get(settings));
165+
// Assert that no unknown processors are defined in the allowlist
166+
final Set<String> unknownAllowlistProcessors = allowlist.stream()
167+
.filter(p -> map.containsKey(p) == false)
168+
.collect(Collectors.toSet());
169+
if (unknownAllowlistProcessors.isEmpty() == false) {
170+
throw new IllegalArgumentException(
171+
"Processor(s) "
172+
+ unknownAllowlistProcessors
173+
+ " were defined in ["
174+
+ PROCESSORS_ALLOWLIST_SETTING.getKey()
175+
+ "] but do not exist"
176+
);
177+
}
178+
return map.entrySet()
179+
.stream()
180+
.filter(e -> allowlist.contains(e.getKey()))
181+
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
182+
}
150183
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.common.settings.Settings;
12+
import org.opensearch.env.TestEnvironment;
13+
import org.opensearch.ingest.Processor;
14+
import org.opensearch.test.OpenSearchTestCase;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.Set;
19+
20+
public class IngestCommonModulePluginTests extends OpenSearchTestCase {
21+
22+
public void testAllowlist() throws IOException {
23+
runAllowlistTest(List.of());
24+
runAllowlistTest(List.of("date"));
25+
runAllowlistTest(List.of("set"));
26+
runAllowlistTest(List.of("copy", "date"));
27+
runAllowlistTest(List.of("date", "set", "copy"));
28+
}
29+
30+
private void runAllowlistTest(List<String> allowlist) throws IOException {
31+
final Settings settings = Settings.builder()
32+
.putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), allowlist)
33+
.build();
34+
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
35+
assertEquals(Set.copyOf(allowlist), plugin.getProcessors(createParameters(settings)).keySet());
36+
}
37+
}
38+
39+
public void testAllowlistNotSpecified() throws IOException {
40+
final Settings.Builder builder = Settings.builder();
41+
builder.remove(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey());
42+
final Settings settings = builder.build();
43+
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
44+
final Set<String> expected = Set.of(
45+
"append",
46+
"urldecode",
47+
"sort",
48+
"fail",
49+
"trim",
50+
"set",
51+
"fingerprint",
52+
"pipeline",
53+
"json",
54+
"join",
55+
"kv",
56+
"bytes",
57+
"date",
58+
"drop",
59+
"community_id",
60+
"lowercase",
61+
"convert",
62+
"copy",
63+
"gsub",
64+
"dot_expander",
65+
"rename",
66+
"remove_by_pattern",
67+
"html_strip",
68+
"remove",
69+
"csv",
70+
"grok",
71+
"date_index_name",
72+
"foreach",
73+
"script",
74+
"dissect",
75+
"uppercase",
76+
"split"
77+
);
78+
assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet());
79+
}
80+
}
81+
82+
public void testAllowlistHasNonexistentProcessors() throws IOException {
83+
final Settings settings = Settings.builder()
84+
.putList(IngestCommonModulePlugin.PROCESSORS_ALLOWLIST_SETTING.getKey(), List.of("threeve"))
85+
.build();
86+
try (IngestCommonModulePlugin plugin = new IngestCommonModulePlugin()) {
87+
IllegalArgumentException e = expectThrows(
88+
IllegalArgumentException.class,
89+
() -> plugin.getProcessors(createParameters(settings))
90+
);
91+
assertTrue(e.getMessage(), e.getMessage().contains("threeve"));
92+
}
93+
}
94+
95+
private static Processor.Parameters createParameters(Settings settings) {
96+
return new Processor.Parameters(
97+
TestEnvironment.newEnvironment(Settings.builder().put(settings).put("path.home", "").build()),
98+
null,
99+
null,
100+
null,
101+
() -> 0L,
102+
(a, b) -> null,
103+
null,
104+
null,
105+
$ -> {},
106+
null
107+
);
108+
}
109+
}

0 commit comments

Comments
 (0)