8
8
9
9
package org .opensearch .search .pipeline .common ;
10
10
11
+ import org .opensearch .common .settings .Setting ;
12
+ import org .opensearch .common .settings .Settings ;
11
13
import org .opensearch .plugins .Plugin ;
12
14
import org .opensearch .plugins .SearchPipelinePlugin ;
13
15
import org .opensearch .search .pipeline .Processor ;
16
+ import org .opensearch .search .pipeline .SearchPhaseResultsProcessor ;
14
17
import org .opensearch .search .pipeline .SearchRequestProcessor ;
15
18
import org .opensearch .search .pipeline .SearchResponseProcessor ;
16
19
20
+ import java .util .List ;
17
21
import java .util .Map ;
22
+ import java .util .Set ;
23
+ import java .util .function .Function ;
24
+ import java .util .stream .Collectors ;
18
25
19
26
/**
20
27
* Plugin providing common search request/response processors for use in search pipelines.
21
28
*/
22
29
public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPipelinePlugin {
23
30
31
+ static final Setting <List <String >> REQUEST_PROCESSORS_ALLOWLIST_SETTING = Setting .listSetting (
32
+ "search.pipeline.common.request.processors.allowed" ,
33
+ List .of (),
34
+ Function .identity (),
35
+ Setting .Property .NodeScope
36
+ );
37
+
38
+ static final Setting <List <String >> RESPONSE_PROCESSORS_ALLOWLIST_SETTING = Setting .listSetting (
39
+ "search.pipeline.common.response.processors.allowed" ,
40
+ List .of (),
41
+ Function .identity (),
42
+ Setting .Property .NodeScope
43
+ );
44
+
45
+ static final Setting <List <String >> SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING = Setting .listSetting (
46
+ "search.pipeline.common.search.phase.results.processors.allowed" ,
47
+ List .of (),
48
+ Function .identity (),
49
+ Setting .Property .NodeScope
50
+ );
51
+
24
52
/**
25
53
* No constructor needed, but build complains if we don't have a constructor with JavaDoc.
26
54
*/
27
55
public SearchPipelineCommonModulePlugin () {}
28
56
57
+ @ Override
58
+ public List <Setting <?>> getSettings () {
59
+ return List .of (
60
+ REQUEST_PROCESSORS_ALLOWLIST_SETTING ,
61
+ RESPONSE_PROCESSORS_ALLOWLIST_SETTING ,
62
+ SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING
63
+ );
64
+ }
65
+
29
66
/**
30
67
* Returns a map of processor factories.
31
68
*
@@ -34,25 +71,62 @@ public SearchPipelineCommonModulePlugin() {}
34
71
*/
35
72
@ Override
36
73
public Map <String , Processor .Factory <SearchRequestProcessor >> getRequestProcessors (Parameters parameters ) {
37
- return Map .of (
38
- FilterQueryRequestProcessor .TYPE ,
39
- new FilterQueryRequestProcessor .Factory (parameters .namedXContentRegistry ),
40
- ScriptRequestProcessor .TYPE ,
41
- new ScriptRequestProcessor .Factory (parameters .scriptService ),
42
- OversampleRequestProcessor .TYPE ,
43
- new OversampleRequestProcessor .Factory ()
74
+ return filterForAllowlistSetting (
75
+ REQUEST_PROCESSORS_ALLOWLIST_SETTING ,
76
+ parameters .env .settings (),
77
+ Map .of (
78
+ FilterQueryRequestProcessor .TYPE ,
79
+ new FilterQueryRequestProcessor .Factory (parameters .namedXContentRegistry ),
80
+ ScriptRequestProcessor .TYPE ,
81
+ new ScriptRequestProcessor .Factory (parameters .scriptService ),
82
+ OversampleRequestProcessor .TYPE ,
83
+ new OversampleRequestProcessor .Factory ()
84
+ )
44
85
);
45
86
}
46
87
47
88
@ Override
48
89
public Map <String , Processor .Factory <SearchResponseProcessor >> getResponseProcessors (Parameters parameters ) {
49
- return Map .of (
50
- RenameFieldResponseProcessor .TYPE ,
51
- new RenameFieldResponseProcessor .Factory (),
52
- TruncateHitsResponseProcessor .TYPE ,
53
- new TruncateHitsResponseProcessor .Factory (),
54
- CollapseResponseProcessor .TYPE ,
55
- new CollapseResponseProcessor .Factory ()
90
+ return filterForAllowlistSetting (
91
+ RESPONSE_PROCESSORS_ALLOWLIST_SETTING ,
92
+ parameters .env .settings (),
93
+ Map .of (
94
+ RenameFieldResponseProcessor .TYPE ,
95
+ new RenameFieldResponseProcessor .Factory (),
96
+ TruncateHitsResponseProcessor .TYPE ,
97
+ new TruncateHitsResponseProcessor .Factory (),
98
+ CollapseResponseProcessor .TYPE ,
99
+ new CollapseResponseProcessor .Factory ()
100
+ )
56
101
);
57
102
}
103
+
104
+ @ Override
105
+ public Map <String , Processor .Factory <SearchPhaseResultsProcessor >> getSearchPhaseResultsProcessors (Parameters parameters ) {
106
+ return filterForAllowlistSetting (SEARCH_PHASE_RESULTS_PROCESSORS_ALLOWLIST_SETTING , parameters .env .settings (), Map .of ());
107
+ }
108
+
109
+ private <T extends Processor > Map <String , Processor .Factory <T >> filterForAllowlistSetting (
110
+ Setting <List <String >> allowlistSetting ,
111
+ Settings settings ,
112
+ Map <String , Processor .Factory <T >> map
113
+ ) {
114
+ if (allowlistSetting .exists (settings ) == false ) {
115
+ return Map .copyOf (map );
116
+ }
117
+ final Set <String > allowlist = Set .copyOf (allowlistSetting .get (settings ));
118
+ // Assert that no unknown processors are defined in the allowlist
119
+ final Set <String > unknownAllowlistProcessors = allowlist .stream ()
120
+ .filter (p -> map .containsKey (p ) == false )
121
+ .collect (Collectors .toUnmodifiableSet ());
122
+ if (unknownAllowlistProcessors .isEmpty () == false ) {
123
+ throw new IllegalArgumentException (
124
+ "Processor(s) " + unknownAllowlistProcessors + " were defined in [" + allowlistSetting .getKey () + "] but do not exist"
125
+ );
126
+ }
127
+ return map .entrySet ()
128
+ .stream ()
129
+ .filter (e -> allowlist .contains (e .getKey ()))
130
+ .collect (Collectors .toUnmodifiableMap (Map .Entry ::getKey , Map .Entry ::getValue ));
131
+ }
58
132
}
0 commit comments