Skip to content

Commit fd19066

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Support scripting for composite aggs in concurrent segment search
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 67a2e4c commit fd19066

File tree

4 files changed

+164
-4
lines changed

4 files changed

+164
-4
lines changed

modules/lang-painless/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import com.github.jengelman.gradle.plugins.shadow.ShadowBasePlugin
3333

3434
apply plugin: 'opensearch.validate-rest-spec'
3535
apply plugin: 'opensearch.yaml-rest-test'
36+
apply plugin: 'opensearch.internal-cluster-test'
3637

3738
opensearchplugin {
3839
description 'An easy, safe and fast scripting language for OpenSearch'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.painless;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
12+
13+
import org.opensearch.action.search.SearchResponse;
14+
import org.opensearch.action.support.WriteRequest;
15+
import org.opensearch.cluster.metadata.IndexMetadata;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.common.xcontent.XContentFactory;
18+
import org.opensearch.core.xcontent.MediaTypeRegistry;
19+
import org.opensearch.core.xcontent.XContentBuilder;
20+
import org.opensearch.plugins.Plugin;
21+
import org.opensearch.script.Script;
22+
import org.opensearch.script.ScriptType;
23+
import org.opensearch.search.aggregations.AggregationBuilder;
24+
import org.opensearch.search.aggregations.AggregationBuilders;
25+
import org.opensearch.search.aggregations.bucket.composite.InternalComposite;
26+
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
27+
import org.opensearch.test.OpenSearchIntegTestCase;
28+
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
29+
30+
import java.util.Arrays;
31+
import java.util.Collection;
32+
import java.util.Collections;
33+
import java.util.List;
34+
35+
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
36+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
37+
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
38+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
39+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
40+
41+
@OpenSearchIntegTestCase.SuiteScopeTestCase
42+
public class SimplePainlessIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
43+
44+
public SimplePainlessIT(Settings nodeSettings) {
45+
super(nodeSettings);
46+
}
47+
48+
@ParametersFactory
49+
public static Collection<Object[]> parameters() {
50+
return Arrays.asList(
51+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
52+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }
53+
);
54+
}
55+
56+
@Override
57+
protected Collection<Class<? extends Plugin>> nodePlugins() {
58+
return List.of(PainlessModulePlugin.class);
59+
}
60+
61+
@Override
62+
protected Settings nodeSettings(int nodeOrdinal) {
63+
return Settings.builder()
64+
.put(super.nodeSettings(nodeOrdinal))
65+
.put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), "4")
66+
.build();
67+
}
68+
69+
@Override
70+
public void setupSuiteScopeCluster() throws Exception {
71+
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
72+
.startObject()
73+
.field("dynamic", "false")
74+
.startObject("_meta")
75+
.field("schema_version", 5)
76+
.endObject()
77+
.startObject("properties")
78+
.startObject("entity")
79+
.field("type", "nested")
80+
.endObject()
81+
.endObject()
82+
.endObject();
83+
84+
assertAcked(
85+
prepareCreate("test").setMapping(xContentBuilder)
86+
.setSettings(
87+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
88+
)
89+
);
90+
91+
client().prepareIndex("test")
92+
.setId("a")
93+
.setSource(
94+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.2.3.4\"},{\"name\":\"keyword-field\",\"value\":\"field-1\"}]}",
95+
MediaTypeRegistry.JSON
96+
)
97+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
98+
.get();
99+
client().prepareIndex("test")
100+
.setId("b")
101+
.setSource(
102+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
103+
MediaTypeRegistry.JSON
104+
)
105+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
106+
.get();
107+
client().prepareIndex("test")
108+
.setId("c")
109+
.setSource(
110+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.6.3.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
111+
MediaTypeRegistry.JSON
112+
)
113+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
114+
.get();
115+
client().prepareIndex("test")
116+
.setId("d")
117+
.setSource(
118+
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"2.6.4.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
119+
MediaTypeRegistry.JSON
120+
)
121+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
122+
.get();
123+
ensureSearchable("test");
124+
}
125+
126+
public void testTermsValuesSource() throws Exception {
127+
AggregationBuilder agg = AggregationBuilders.composite(
128+
"multi_buckets",
129+
Collections.singletonList(
130+
new TermsValuesSourceBuilder("keyword-field").script(
131+
new Script(
132+
ScriptType.INLINE,
133+
"painless",
134+
"String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;",
135+
Collections.emptyMap()
136+
)
137+
)
138+
)
139+
);
140+
SearchResponse response = client().prepareSearch("test").setQuery(matchAllQuery()).addAggregation(agg).get();
141+
142+
assertSearchResponse(response);
143+
assertEquals(2, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().size());
144+
assertEquals(
145+
"field-1",
146+
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getKey().get("keyword-field")
147+
);
148+
assertEquals(1, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getDocCount());
149+
assertEquals(
150+
"field-2",
151+
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getKey().get("keyword-field")
152+
);
153+
assertEquals(3, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getDocCount());
154+
}
155+
}

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.opensearch.search.internal.SearchContext;
4141

4242
import java.io.IOException;
43-
import java.util.Arrays;
4443
import java.util.Map;
4544

4645
/**
@@ -81,7 +80,6 @@ protected Aggregator createInternal(
8180

8281
@Override
8382
protected boolean supportsConcurrentSegmentSearch() {
84-
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
85-
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
83+
return true;
8684
}
8785
}

server/src/main/java/org/opensearch/search/lookup/SearchLookup.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,14 @@ public final SearchLookup forkAndTrackFieldReferences(String field) {
153153
return new SearchLookup(this, newFieldChain);
154154
}
155155

156+
// SourceLookup is not thread safe so we create a new instance for each leaf to support concurrent segment search
156157
public LeafSearchLookup getLeafSearchLookup(LeafReaderContext context) {
157-
return new LeafSearchLookup(context, docMap.getLeafDocLookup(context), sourceLookup, fieldsLookup.getLeafFieldsLookup(context));
158+
return new LeafSearchLookup(
159+
context,
160+
docMap.getLeafDocLookup(context),
161+
new SourceLookup(),
162+
fieldsLookup.getLeafFieldsLookup(context)
163+
);
158164
}
159165

160166
public DocLookup doc() {

0 commit comments

Comments
 (0)