8
8
9
9
package org .opensearch .search .pipeline .common ;
10
10
11
- import org .opensearch .action .admin .indices .delete .DeleteIndexRequest ;
12
11
import org .opensearch .action .admin .indices .refresh .RefreshRequest ;
13
12
import org .opensearch .action .admin .indices .refresh .RefreshResponse ;
13
+ import org .opensearch .action .admin .indices .settings .put .UpdateSettingsRequest ;
14
14
import org .opensearch .action .index .IndexRequest ;
15
15
import org .opensearch .action .index .IndexResponse ;
16
16
import org .opensearch .action .search .DeleteSearchPipelineRequest ;
17
+ import org .opensearch .action .search .GetSearchPipelineRequest ;
18
+ import org .opensearch .action .search .GetSearchPipelineResponse ;
17
19
import org .opensearch .action .search .PutSearchPipelineRequest ;
18
20
import org .opensearch .action .search .SearchRequest ;
19
21
import org .opensearch .action .search .SearchResponse ;
20
22
import org .opensearch .action .support .master .AcknowledgedResponse ;
23
+ import org .opensearch .common .settings .Settings ;
21
24
import org .opensearch .core .common .bytes .BytesArray ;
25
+ import org .opensearch .core .common .bytes .BytesReference ;
22
26
import org .opensearch .core .rest .RestStatus ;
23
27
import org .opensearch .core .xcontent .MediaTypeRegistry ;
24
28
import org .opensearch .index .query .MatchAllQueryBuilder ;
29
+ import org .opensearch .ingest .PipelineConfiguration ;
25
30
import org .opensearch .plugins .Plugin ;
26
31
import org .opensearch .search .builder .SearchSourceBuilder ;
27
32
import org .opensearch .test .OpenSearchIntegTestCase ;
33
+ import org .junit .After ;
34
+ import org .junit .Before ;
28
35
29
36
import java .util .Collection ;
37
+ import java .util .HashMap ;
30
38
import java .util .List ;
31
39
import java .util .Map ;
32
40
33
41
@ OpenSearchIntegTestCase .SuiteScopeTestCase
34
42
public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {
35
43
44
+ private static final String TEST_INDEX = "myindex" ;
45
+ private static final String PIPELINE_NAME = "test_pipeline" ;
46
+
36
47
@ Override
37
48
protected Collection <Class <? extends Plugin >> nodePlugins () {
38
49
return List .of (SearchPipelineCommonModulePlugin .class );
39
50
}
40
51
52
+ @ Before
53
+ public void setup () throws Exception {
54
+ createIndex (TEST_INDEX );
55
+
56
+ IndexRequest doc1 = new IndexRequest (TEST_INDEX ).id ("doc1" ).source (Map .of ("field" , "value" ));
57
+ IndexRequest doc2 = new IndexRequest (TEST_INDEX ).id ("doc2" ).source (Map .of ("field" , "something else" ));
58
+
59
+ IndexResponse ir = client ().index (doc1 ).actionGet ();
60
+ assertSame (RestStatus .CREATED , ir .status ());
61
+ ir = client ().index (doc2 ).actionGet ();
62
+ assertSame (RestStatus .CREATED , ir .status ());
63
+
64
+ RefreshResponse refRsp = client ().admin ().indices ().refresh (new RefreshRequest (TEST_INDEX )).actionGet ();
65
+ assertSame (RestStatus .OK , refRsp .getStatus ());
66
+ }
67
+
68
+ @ After
69
+ public void cleanup () throws Exception {
70
+ internalCluster ().wipeIndices (TEST_INDEX );
71
+ }
72
+
41
73
public void testFilterQuery () {
42
74
// Create a pipeline with a filter_query processor.
43
- String pipelineName = "foo" ;
75
+ createPipeline ();
76
+
77
+ // Search without the pipeline. Should see both documents.
78
+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
79
+ SearchResponse rsp = client ().search (req ).actionGet ();
80
+ assertEquals (2 , rsp .getHits ().getTotalHits ().value );
81
+
82
+ // Search with the pipeline. Should only see document with "field":"value".
83
+ req .pipeline (PIPELINE_NAME );
84
+ rsp = client ().search (req ).actionGet ();
85
+ assertEquals (1 , rsp .getHits ().getTotalHits ().value );
86
+
87
+ // Clean up.
88
+ deletePipeline ();
89
+ }
90
+
91
+ public void testSearchWithTemporaryPipeline () throws Exception {
92
+
93
+ // Search without the pipeline. Should see both documents.
94
+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
95
+ SearchResponse rsp = client ().search (req ).actionGet ();
96
+ assertEquals (2 , rsp .getHits ().getTotalHits ().value );
97
+
98
+ // Search with temporary pipeline
99
+ Map <String , Object > pipelineSourceMap = new HashMap <>();
100
+ Map <String , Object > requestProcessorConfig = new HashMap <>();
101
+
102
+ Map <String , Object > filterQuery = new HashMap <>();
103
+ filterQuery .put ("query" , Map .of ("term" , Map .of ("field" , "value" )));
104
+ requestProcessorConfig .put ("filter_query" , filterQuery );
105
+ pipelineSourceMap .put ("request_processors" , List .of (requestProcessorConfig ));
106
+
107
+ req = new SearchRequest (TEST_INDEX ).source (
108
+ new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()).searchPipelineSource (pipelineSourceMap )
109
+ );
110
+
111
+ SearchResponse rspWithTempPipeline = client ().search (req ).actionGet ();
112
+ assertEquals (1 , rspWithTempPipeline .getHits ().getTotalHits ().value );
113
+ }
114
+
115
+ public void testSearchWithDefaultPipeline () throws Exception {
116
+ // Create pipeline
117
+ createPipeline ();
118
+
119
+ // Search without the pipeline. Should see both documents.
120
+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
121
+ SearchResponse rsp = client ().search (req ).actionGet ();
122
+ assertEquals (2 , rsp .getHits ().getTotalHits ().value );
123
+
124
+ // Set pipeline as default for the index
125
+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest (TEST_INDEX );
126
+ updateSettingsRequest .settings (Settings .builder ().put ("index.search.default_pipeline" , PIPELINE_NAME ));
127
+ AcknowledgedResponse updateSettingsResponse = client ().admin ().indices ().updateSettings (updateSettingsRequest ).actionGet ();
128
+ assertTrue (updateSettingsResponse .isAcknowledged ());
129
+
130
+ // Search with the default pipeline. Should only see document with "field":"value".
131
+ rsp = client ().search (req ).actionGet ();
132
+ assertEquals (1 , rsp .getHits ().getTotalHits ().value );
133
+
134
+ // Clean up: Remove default pipeline setting
135
+ updateSettingsRequest = new UpdateSettingsRequest (TEST_INDEX );
136
+ updateSettingsRequest .settings (Settings .builder ().putNull ("index.search.default_pipeline" ));
137
+ updateSettingsResponse = client ().admin ().indices ().updateSettings (updateSettingsRequest ).actionGet ();
138
+ assertTrue (updateSettingsResponse .isAcknowledged ());
139
+
140
+ // Clean up.
141
+ deletePipeline ();
142
+ }
143
+
144
+ public void testUpdateSearchPipeline () throws Exception {
145
+ // Create initial pipeline
146
+ createPipeline ();
147
+
148
+ // Verify initial pipeline
149
+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
150
+ req .pipeline (PIPELINE_NAME );
151
+ SearchResponse initialRsp = client ().search (req ).actionGet ();
152
+ assertEquals (1 , initialRsp .getHits ().getTotalHits ().value );
153
+
154
+ BytesReference pipelineConfig = new BytesArray (
155
+ "{"
156
+ + "\" description\" : \" Updated pipeline\" ,"
157
+ + "\" request_processors\" : ["
158
+ + "{"
159
+ + "\" filter_query\" : {"
160
+ + "\" query\" : {"
161
+ + "\" term\" : {"
162
+ + "\" field\" : \" something else\" "
163
+ + "}"
164
+ + "}"
165
+ + "}"
166
+ + "}"
167
+ + "]"
168
+ + "}"
169
+ );
170
+
171
+ PipelineConfiguration pipeline = new PipelineConfiguration (PIPELINE_NAME , pipelineConfig , MediaTypeRegistry .JSON );
172
+
173
+ // Update pipeline
174
+ PutSearchPipelineRequest updateRequest = new PutSearchPipelineRequest (pipeline .getId (), pipelineConfig , MediaTypeRegistry .JSON );
175
+ AcknowledgedResponse ackRsp = client ().admin ().cluster ().putSearchPipeline (updateRequest ).actionGet ();
176
+ assertTrue (ackRsp .isAcknowledged ());
177
+
178
+ // Verify pipeline description
179
+ GetSearchPipelineResponse getPipelineResponse = client ().admin ()
180
+ .cluster ()
181
+ .getSearchPipeline (new GetSearchPipelineRequest (PIPELINE_NAME ))
182
+ .actionGet ();
183
+ assertEquals (PIPELINE_NAME , getPipelineResponse .pipelines ().get (0 ).getId ());
184
+ assertEquals (pipeline .getConfigAsMap (), getPipelineResponse .pipelines ().get (0 ).getConfigAsMap ());
185
+ // Clean up.
186
+ deletePipeline ();
187
+ }
188
+
189
+ private void createPipeline () {
44
190
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest (
45
- pipelineName ,
191
+ PIPELINE_NAME ,
46
192
new BytesArray (
47
193
"{"
48
194
+ "\" request_processors\" : ["
@@ -62,35 +208,13 @@ public void testFilterQuery() {
62
208
);
63
209
AcknowledgedResponse ackRsp = client ().admin ().cluster ().putSearchPipeline (putSearchPipelineRequest ).actionGet ();
64
210
assertTrue (ackRsp .isAcknowledged ());
211
+ }
65
212
66
- // Index some documents.
67
- String indexName = "myindex" ;
68
- IndexRequest doc1 = new IndexRequest (indexName ).id ("doc1" ).source (Map .of ("field" , "value" ));
69
- IndexRequest doc2 = new IndexRequest (indexName ).id ("doc2" ).source (Map .of ("field" , "something else" ));
70
-
71
- IndexResponse ir = client ().index (doc1 ).actionGet ();
72
- assertSame (RestStatus .CREATED , ir .status ());
73
- ir = client ().index (doc2 ).actionGet ();
74
- assertSame (RestStatus .CREATED , ir .status ());
75
-
76
- // Refresh so the documents are visible to search.
77
- RefreshResponse refRsp = client ().admin ().indices ().refresh (new RefreshRequest (indexName )).actionGet ();
78
- assertSame (RestStatus .OK , refRsp .getStatus ());
79
-
80
- // Search without the pipeline. Should see both documents.
81
- SearchRequest req = new SearchRequest (indexName ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
82
- SearchResponse rsp = client ().search (req ).actionGet ();
83
- assertEquals (2 , rsp .getHits ().getTotalHits ().value );
84
-
85
- // Search with the pipeline. Should only see document with "field":"value".
86
- req .pipeline (pipelineName );
87
- rsp = client ().search (req ).actionGet ();
88
- assertEquals (1 , rsp .getHits ().getTotalHits ().value );
89
-
90
- // Clean up.
91
- ackRsp = client ().admin ().cluster ().deleteSearchPipeline (new DeleteSearchPipelineRequest (pipelineName )).actionGet ();
92
- assertTrue (ackRsp .isAcknowledged ());
93
- ackRsp = client ().admin ().indices ().delete (new DeleteIndexRequest (indexName )).actionGet ();
213
+ private void deletePipeline () {
214
+ AcknowledgedResponse ackRsp = client ().admin ()
215
+ .cluster ()
216
+ .deleteSearchPipeline (new DeleteSearchPipelineRequest (PIPELINE_NAME ))
217
+ .actionGet ();
94
218
assertTrue (ackRsp .isAcknowledged ());
95
219
}
96
220
}
0 commit comments