Skip to content

Commit d62f639

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Support scripting for composite aggs in concurrent segment search
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 67a2e4c commit d62f639

File tree

6 files changed

+247
-5
lines changed

6 files changed

+247
-5
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
1313
- Add ThreadContextPermission for markAsSystemContext and allow core to perform the method ([#15016](https://github.com/opensearch-project/OpenSearch/pull/15016))
1414
- Add ThreadContextPermission for stashAndMergeHeaders and stashWithOrigin ([#15039](https://github.com/opensearch-project/OpenSearch/pull/15039))
15+
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
1516

1617
### Dependencies
1718
- Bump `org.apache.commons:commons-lang3` from 3.14.0 to 3.15.0 ([#14861](https://github.com/opensearch-project/OpenSearch/pull/14861))

modules/lang-painless/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import com.github.jengelman.gradle.plugins.shadow.ShadowBasePlugin
3333

3434
apply plugin: 'opensearch.validate-rest-spec'
3535
apply plugin: 'opensearch.yaml-rest-test'
36+
apply plugin: 'opensearch.internal-cluster-test'
3637

3738
opensearchplugin {
3839
description 'An easy, safe and fast scripting language for OpenSearch'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.painless;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
12+
13+
import org.opensearch.action.search.SearchRequest;
14+
import org.opensearch.action.search.SearchResponse;
15+
import org.opensearch.action.support.WriteRequest;
16+
import org.opensearch.cluster.metadata.IndexMetadata;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.xcontent.XContentFactory;
19+
import org.opensearch.core.xcontent.MediaTypeRegistry;
20+
import org.opensearch.core.xcontent.XContentBuilder;
21+
import org.opensearch.index.query.TermsQueryBuilder;
22+
import org.opensearch.plugins.Plugin;
23+
import org.opensearch.script.Script;
24+
import org.opensearch.script.ScriptType;
25+
import org.opensearch.search.aggregations.AggregationBuilder;
26+
import org.opensearch.search.aggregations.AggregationBuilders;
27+
import org.opensearch.search.aggregations.bucket.composite.InternalComposite;
28+
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
29+
import org.opensearch.search.aggregations.bucket.terms.Terms;
30+
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
31+
import org.opensearch.search.builder.SearchSourceBuilder;
32+
import org.opensearch.test.OpenSearchIntegTestCase;
33+
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
34+
35+
import java.util.Arrays;
36+
import java.util.Collection;
37+
import java.util.Collections;
38+
import java.util.List;
39+
import java.util.Objects;
40+
41+
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
42+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
43+
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
44+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
45+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
46+
47+
@OpenSearchIntegTestCase.SuiteScopeTestCase
48+
public class SimplePainlessIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
49+
50+
public SimplePainlessIT(Settings nodeSettings) {
51+
super(nodeSettings);
52+
}
53+
54+
@ParametersFactory
55+
public static Collection<Object[]> parameters() {
56+
return Arrays.asList(
57+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
58+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }
59+
);
60+
}
61+
62+
@Override
63+
protected Collection<Class<? extends Plugin>> nodePlugins() {
64+
return List.of(PainlessModulePlugin.class);
65+
}
66+
67+
@Override
68+
protected Settings nodeSettings(int nodeOrdinal) {
69+
return Settings.builder()
70+
.put(super.nodeSettings(nodeOrdinal))
71+
.put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), "4")
72+
.build();
73+
}
74+
75+
@Override
76+
public void setupSuiteScopeCluster() throws Exception {
77+
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
78+
.startObject()
79+
.field("dynamic", "false")
80+
.startObject("_meta")
81+
.field("schema_version", 5)
82+
.endObject()
83+
.startObject("properties")
84+
.startObject("entity")
85+
.field("type", "nested")
86+
.endObject()
87+
.endObject()
88+
.endObject();
89+
90+
assertAcked(
91+
prepareCreate("test").setMapping(xContentBuilder)
92+
.setSettings(
93+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
94+
)
95+
);
96+
97+
assertAcked(
98+
prepareCreate("test-df").setSettings(
99+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
100+
)
101+
);
102+
103+
client().prepareIndex("test")
104+
.setId("a")
105+
.setSource(
106+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.2.3.4\"},{\"name\":\"keyword-field\",\"value\":\"field-1\"}]}",
107+
MediaTypeRegistry.JSON
108+
)
109+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
110+
.get();
111+
client().prepareIndex("test")
112+
.setId("b")
113+
.setSource(
114+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
115+
MediaTypeRegistry.JSON
116+
)
117+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
118+
.get();
119+
client().prepareIndex("test")
120+
.setId("c")
121+
.setSource(
122+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.6.3.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
123+
MediaTypeRegistry.JSON
124+
)
125+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
126+
.get();
127+
client().prepareIndex("test")
128+
.setId("d")
129+
.setSource(
130+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"2.6.4.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
131+
MediaTypeRegistry.JSON
132+
)
133+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
134+
.get();
135+
ensureSearchable("test");
136+
137+
client().prepareIndex("test-df")
138+
.setId("a")
139+
.setSource("{\"field\":\"value1\"}", MediaTypeRegistry.JSON)
140+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
141+
.get();
142+
client().prepareIndex("test-df")
143+
.setId("b")
144+
.setSource("{\"field\":\"value2\"}", MediaTypeRegistry.JSON)
145+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
146+
.get();
147+
client().prepareIndex("test-df")
148+
.setId("c")
149+
.setSource("{\"field\":\"value3\"}", MediaTypeRegistry.JSON)
150+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
151+
.get();
152+
client().prepareIndex("test-df")
153+
.setId("d")
154+
.setSource("{\"field\":\"value1\"}", MediaTypeRegistry.JSON)
155+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
156+
.get();
157+
ensureSearchable("test");
158+
}
159+
160+
public void testTermsValuesSource() throws Exception {
161+
AggregationBuilder agg = AggregationBuilders.composite(
162+
"multi_buckets",
163+
Collections.singletonList(
164+
new TermsValuesSourceBuilder("keyword-field").script(
165+
new Script(
166+
ScriptType.INLINE,
167+
"painless",
168+
"String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;",
169+
Collections.emptyMap()
170+
)
171+
)
172+
)
173+
);
174+
SearchResponse response = client().prepareSearch("test").setQuery(matchAllQuery()).addAggregation(agg).get();
175+
176+
assertSearchResponse(response);
177+
assertEquals(2, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().size());
178+
assertEquals(
179+
"field-1",
180+
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getKey().get("keyword-field")
181+
);
182+
assertEquals(1, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getDocCount());
183+
assertEquals(
184+
"field-2",
185+
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getKey().get("keyword-field")
186+
);
187+
assertEquals(3, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getDocCount());
188+
}
189+
190+
public void testSimpleDerivedFieldsQuery() {
191+
assumeFalse(
192+
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
193+
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
194+
);
195+
SearchRequest searchRequest = new SearchRequest("test-df").source(
196+
SearchSourceBuilder.searchSource()
197+
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
198+
.fetchField("result")
199+
.query(new TermsQueryBuilder("result", "value1"))
200+
);
201+
SearchResponse response = client().search(searchRequest).actionGet();
202+
assertSearchResponse(response);
203+
assertEquals(2, Objects.requireNonNull(response.getHits().getTotalHits()).value);
204+
}
205+
206+
public void testSimpleDerivedFieldsAgg() {
207+
assumeFalse(
208+
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
209+
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
210+
);
211+
SearchRequest searchRequest = new SearchRequest("test-df").source(
212+
SearchSourceBuilder.searchSource()
213+
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
214+
.fetchField("result")
215+
.aggregation(new TermsAggregationBuilder("derived-agg").field("result"))
216+
);
217+
SearchResponse response = client().search(searchRequest).actionGet();
218+
assertSearchResponse(response);
219+
Terms aggResponse = response.getAggregations().get("derived-agg");
220+
assertEquals(3, aggResponse.getBuckets().size());
221+
Terms.Bucket bucket = aggResponse.getBuckets().get(0);
222+
assertEquals("value1", bucket.getKey());
223+
assertEquals(2, bucket.getDocCount());
224+
bucket = aggResponse.getBuckets().get(1);
225+
assertEquals("value2", bucket.getKey());
226+
assertEquals(1, bucket.getDocCount());
227+
bucket = aggResponse.getBuckets().get(2);
228+
assertEquals("value3", bucket.getKey());
229+
assertEquals(1, bucket.getDocCount());
230+
}
231+
}

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.opensearch.search.internal.SearchContext;
4141

4242
import java.io.IOException;
43-
import java.util.Arrays;
4443
import java.util.Map;
4544

4645
/**
@@ -81,7 +80,6 @@ protected Aggregator createInternal(
8180

8281
@Override
8382
protected boolean supportsConcurrentSegmentSearch() {
84-
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
85-
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
83+
return true;
8684
}
8785
}

server/src/main/java/org/opensearch/search/lookup/SearchLookup.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,25 @@ public final SearchLookup forkAndTrackFieldReferences(String field) {
153153
return new SearchLookup(this, newFieldChain);
154154
}
155155

156+
/**
157+
* SourceLookup is not thread safe, so we create a new instance for each leaf to support concurrent segment search
158+
*/
156159
public LeafSearchLookup getLeafSearchLookup(LeafReaderContext context) {
157-
return new LeafSearchLookup(context, docMap.getLeafDocLookup(context), sourceLookup, fieldsLookup.getLeafFieldsLookup(context));
160+
return new LeafSearchLookup(
161+
context,
162+
docMap.getLeafDocLookup(context),
163+
new SourceLookup(),
164+
fieldsLookup.getLeafFieldsLookup(context)
165+
);
158166
}
159167

160168
public DocLookup doc() {
161169
return docMap;
162170
}
163171

172+
/**
173+
* Returned SourceLookup will be unrelated to any created LeafSearchLookups. Instead, use {@link LeafSearchLookup#source()} to access the related {@link SearchLookup}.
174+
*/
164175
public SourceLookup source() {
165176
return sourceLookup;
166177
}

server/src/main/java/org/opensearch/search/lookup/SourceLookup.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import static java.util.Collections.emptyMap;
5858

5959
/**
60-
* Orchestrator class for source lookups
60+
* Orchestrator class for source lookups. Not thread safe.
6161
*
6262
* @opensearch.api
6363
*/

0 commit comments

Comments
 (0)