Skip to content

Commit 609b373

Browse files
gaobinlongPeter Alfonsi
authored and
Peter Alfonsi
committed
Add remove_by_pattern ingest processor (opensearch-project#11920)
* Add remove_by_pattern ingest processor * Modify change log * Remove some duplicated checks * Add more yml test case * Fix typo --------- Signed-off-by: Gao Binlong <gbinlong@amazon.com>
1 parent 3992f91 commit 609b373

File tree

7 files changed

+555
-0
lines changed

7 files changed

+555
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
133133
- Introduce new feature flag "WRITEABLE_REMOTE_INDEX" to gate the writeable remote index functionality ([#11717](https://github.com/opensearch-project/OpenSearch/pull/11170))
134134
- [Tiered caching] Integrating ehcache as a disk cache option ([#11874](https://github.com/opensearch-project/OpenSearch/pull/11874))
135135
- Bump OpenTelemetry from 1.32.0 to 1.34.1 ([#11891](https://github.com/opensearch-project/OpenSearch/pull/11891))
136+
- Add remove_by_pattern ingest processor ([#11920](https://github.com/opensearch-project/OpenSearch/pull/11920))
136137
- Support index level allocation filtering for searchable snapshot index ([#11563](https://github.com/opensearch-project/OpenSearch/pull/11563))
137138
- Add `org.opensearch.rest.MethodHandlers` and `RestController#getAllHandlers` ([11876](https://github.com/opensearch-project/OpenSearch/pull/11876))
138139
- New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465))

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
107107
processors.put(HtmlStripProcessor.TYPE, new HtmlStripProcessor.Factory());
108108
processors.put(CsvProcessor.TYPE, new CsvProcessor.Factory());
109109
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
110+
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
110111
return Collections.unmodifiableMap(processors);
111112
}
112113

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.common.Nullable;
12+
import org.opensearch.common.ValidationException;
13+
import org.opensearch.common.regex.Regex;
14+
import org.opensearch.core.common.Strings;
15+
import org.opensearch.ingest.AbstractProcessor;
16+
import org.opensearch.ingest.ConfigurationUtils;
17+
import org.opensearch.ingest.IngestDocument;
18+
import org.opensearch.ingest.Processor;
19+
20+
import java.util.ArrayList;
21+
import java.util.HashSet;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.stream.Collectors;
26+
27+
import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
28+
29+
/**
30+
* Processor that removes existing fields by field patterns or excluding field patterns.
31+
*/
32+
public final class RemoveByPatternProcessor extends AbstractProcessor {
33+
34+
public static final String TYPE = "remove_by_pattern";
35+
private final List<String> fieldPatterns;
36+
private final List<String> excludeFieldPatterns;
37+
38+
RemoveByPatternProcessor(
39+
String tag,
40+
String description,
41+
@Nullable List<String> fieldPatterns,
42+
@Nullable List<String> excludeFieldPatterns
43+
) {
44+
super(tag, description);
45+
if (fieldPatterns != null && excludeFieldPatterns != null || fieldPatterns == null && excludeFieldPatterns == null) {
46+
throw new IllegalArgumentException("either fieldPatterns and excludeFieldPatterns must be set");
47+
}
48+
if (fieldPatterns == null) {
49+
this.fieldPatterns = null;
50+
this.excludeFieldPatterns = new ArrayList<>(excludeFieldPatterns);
51+
} else {
52+
this.fieldPatterns = new ArrayList<>(fieldPatterns);
53+
this.excludeFieldPatterns = null;
54+
}
55+
}
56+
57+
public List<String> getFieldPatterns() {
58+
return fieldPatterns;
59+
}
60+
61+
public List<String> getExcludeFieldPatterns() {
62+
return excludeFieldPatterns;
63+
}
64+
65+
@Override
66+
public IngestDocument execute(IngestDocument document) {
67+
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
68+
Set<String> metadataFields = document.getMetadata()
69+
.keySet()
70+
.stream()
71+
.map(IngestDocument.Metadata::getFieldName)
72+
.collect(Collectors.toSet());
73+
74+
if (fieldPatterns != null && !fieldPatterns.isEmpty()) {
75+
existingFields.forEach(field -> {
76+
// ignore metadata fields such as _index, _id, etc.
77+
if (!metadataFields.contains(field)) {
78+
final boolean matched = fieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
79+
if (matched) {
80+
document.removeField(field);
81+
}
82+
}
83+
});
84+
}
85+
86+
if (excludeFieldPatterns != null && !excludeFieldPatterns.isEmpty()) {
87+
existingFields.forEach(field -> {
88+
// ignore metadata fields such as _index, _id, etc.
89+
if (!metadataFields.contains(field)) {
90+
final boolean matched = excludeFieldPatterns.stream().anyMatch(pattern -> Regex.simpleMatch(pattern, field));
91+
if (!matched) {
92+
document.removeField(field);
93+
}
94+
}
95+
});
96+
}
97+
98+
return document;
99+
}
100+
101+
@Override
102+
public String getType() {
103+
return TYPE;
104+
}
105+
106+
public static final class Factory implements Processor.Factory {
107+
108+
public Factory() {}
109+
110+
@Override
111+
public RemoveByPatternProcessor create(
112+
Map<String, Processor.Factory> registry,
113+
String processorTag,
114+
String description,
115+
Map<String, Object> config
116+
) throws Exception {
117+
final List<String> fieldPatterns = new ArrayList<>();
118+
final List<String> excludeFieldPatterns = new ArrayList<>();
119+
final Object fieldPattern = ConfigurationUtils.readOptionalObject(config, "field_pattern");
120+
final Object excludeFieldPattern = ConfigurationUtils.readOptionalObject(config, "exclude_field_pattern");
121+
122+
if (fieldPattern == null && excludeFieldPattern == null || fieldPattern != null && excludeFieldPattern != null) {
123+
throw newConfigurationException(
124+
TYPE,
125+
processorTag,
126+
"field_pattern",
127+
"either field_pattern or exclude_field_pattern must be set"
128+
);
129+
}
130+
131+
if (fieldPattern != null) {
132+
if (fieldPattern instanceof List) {
133+
@SuppressWarnings("unchecked")
134+
List<String> fieldPatternList = (List<String>) fieldPattern;
135+
fieldPatterns.addAll(fieldPatternList);
136+
} else {
137+
fieldPatterns.add((String) fieldPattern);
138+
}
139+
validateFieldPatterns(processorTag, fieldPatterns, "field_pattern");
140+
return new RemoveByPatternProcessor(processorTag, description, fieldPatterns, null);
141+
} else {
142+
if (excludeFieldPattern instanceof List) {
143+
@SuppressWarnings("unchecked")
144+
List<String> excludeFieldPatternList = (List<String>) excludeFieldPattern;
145+
excludeFieldPatterns.addAll(excludeFieldPatternList);
146+
} else {
147+
excludeFieldPatterns.add((String) excludeFieldPattern);
148+
}
149+
validateFieldPatterns(processorTag, excludeFieldPatterns, "exclude_field_pattern");
150+
return new RemoveByPatternProcessor(processorTag, description, null, excludeFieldPatterns);
151+
}
152+
}
153+
154+
private void validateFieldPatterns(String processorTag, List<String> patterns, String patternKey) {
155+
List<String> validationErrors = new ArrayList<>();
156+
for (String fieldPattern : patterns) {
157+
if (fieldPattern.contains("#")) {
158+
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a '#'");
159+
}
160+
if (fieldPattern.contains(":")) {
161+
validationErrors.add(patternKey + " [" + fieldPattern + "] must not contain a ':'");
162+
}
163+
if (fieldPattern.startsWith("_")) {
164+
validationErrors.add(patternKey + " [" + fieldPattern + "] must not start with '_'");
165+
}
166+
if (Strings.validFileNameExcludingAstrix(fieldPattern) == false) {
167+
validationErrors.add(
168+
patternKey + " [" + fieldPattern + "] must not contain the following characters " + Strings.INVALID_FILENAME_CHARS
169+
);
170+
}
171+
}
172+
173+
if (validationErrors.size() > 0) {
174+
ValidationException validationException = new ValidationException();
175+
validationException.addValidationErrors(validationErrors);
176+
throw newConfigurationException(TYPE, processorTag, patternKey, validationException.getMessage());
177+
}
178+
}
179+
}
180+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.OpenSearchException;
12+
import org.opensearch.OpenSearchParseException;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
import org.junit.Before;
15+
16+
import java.util.Arrays;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
21+
import static org.hamcrest.CoreMatchers.equalTo;
22+
23+
public class RemoveByPatternProcessorFactoryTests extends OpenSearchTestCase {
24+
25+
private RemoveByPatternProcessor.Factory factory;
26+
27+
@Before
28+
public void init() {
29+
factory = new RemoveByPatternProcessor.Factory();
30+
}
31+
32+
public void testCreateFieldPatterns() throws Exception {
33+
Map<String, Object> config = new HashMap<>();
34+
config.put("field_pattern", "field1*");
35+
String processorTag = randomAlphaOfLength(10);
36+
RemoveByPatternProcessor removeByPatternProcessor = factory.create(null, processorTag, null, config);
37+
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
38+
assertThat(removeByPatternProcessor.getFieldPatterns().get(0), equalTo("field1*"));
39+
40+
Map<String, Object> config2 = new HashMap<>();
41+
config2.put("field_pattern", List.of("field1*", "field2*"));
42+
removeByPatternProcessor = factory.create(null, processorTag, null, config2);
43+
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
44+
assertThat(removeByPatternProcessor.getFieldPatterns().get(0), equalTo("field1*"));
45+
assertThat(removeByPatternProcessor.getFieldPatterns().get(1), equalTo("field2*"));
46+
47+
Map<String, Object> config3 = new HashMap<>();
48+
List<String> patterns = Arrays.asList("foo*", "*", " ", ",", "#", ":", "_");
49+
config3.put("field_pattern", patterns);
50+
Exception exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config3));
51+
assertThat(
52+
exception.getMessage(),
53+
equalTo(
54+
"[field_pattern] Validation Failed: "
55+
+ "1: field_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
56+
+ "2: field_pattern [,] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
57+
+ "3: field_pattern [#] must not contain a '#';"
58+
+ "4: field_pattern [:] must not contain a ':';"
59+
+ "5: field_pattern [_] must not start with '_';"
60+
)
61+
);
62+
}
63+
64+
public void testCreateExcludeFieldPatterns() throws Exception {
65+
Map<String, Object> config = new HashMap<>();
66+
config.put("exclude_field_pattern", "field1*");
67+
String processorTag = randomAlphaOfLength(10);
68+
RemoveByPatternProcessor removeByPatternProcessor = factory.create(null, processorTag, null, config);
69+
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
70+
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(0), equalTo("field1*"));
71+
72+
Map<String, Object> config2 = new HashMap<>();
73+
config2.put("exclude_field_pattern", List.of("field1*", "field2*"));
74+
removeByPatternProcessor = factory.create(null, processorTag, null, config2);
75+
assertThat(removeByPatternProcessor.getTag(), equalTo(processorTag));
76+
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(0), equalTo("field1*"));
77+
assertThat(removeByPatternProcessor.getExcludeFieldPatterns().get(1), equalTo("field2*"));
78+
79+
Map<String, Object> config3 = new HashMap<>();
80+
List<String> patterns = Arrays.asList("foo*", "*", " ", ",", "#", ":", "_");
81+
config3.put("exclude_field_pattern", patterns);
82+
Exception exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config3));
83+
assertThat(
84+
exception.getMessage(),
85+
equalTo(
86+
"[exclude_field_pattern] Validation Failed: "
87+
+ "1: exclude_field_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
88+
+ "2: exclude_field_pattern [,] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];"
89+
+ "3: exclude_field_pattern [#] must not contain a '#';"
90+
+ "4: exclude_field_pattern [:] must not contain a ':';"
91+
+ "5: exclude_field_pattern [_] must not start with '_';"
92+
)
93+
);
94+
}
95+
96+
public void testCreatePatternsFailed() throws Exception {
97+
Map<String, Object> config = new HashMap<>();
98+
config.put("field_pattern", List.of("foo*"));
99+
config.put("exclude_field_pattern", List.of("bar*"));
100+
String processorTag = randomAlphaOfLength(10);
101+
OpenSearchException exception = expectThrows(
102+
OpenSearchParseException.class,
103+
() -> factory.create(null, processorTag, null, config)
104+
);
105+
assertThat(exception.getMessage(), equalTo("[field_pattern] either field_pattern or exclude_field_pattern must be set"));
106+
107+
Map<String, Object> config2 = new HashMap<>();
108+
config2.put("field_pattern", null);
109+
config2.put("exclude_field_pattern", null);
110+
111+
exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config2));
112+
assertThat(exception.getMessage(), equalTo("[field_pattern] either field_pattern or exclude_field_pattern must be set"));
113+
}
114+
}

0 commit comments

Comments
 (0)