Skip to content

Commit 36d14e4

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Perform buildAggregation concurrently and support Composite Aggregations (opensearch-project#12697)
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 42f00ba commit 36d14e4

File tree

14 files changed

+173
-41
lines changed

14 files changed

+173
-41
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
## [Unreleased 2.x]
77
### Added
88
- Add explicit dependency to validatePom and generatePom tasks ([#12909](https://github.com/opensearch-project/OpenSearch/pull/12909))
9+
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
910

1011
### Dependencies
1112
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/internalClusterTest/java/org/opensearch/search/aggregations/AggregationsIntegrationIT.java

+23
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.opensearch.action.index.IndexRequestBuilder;
3939
import org.opensearch.action.search.SearchPhaseExecutionException;
4040
import org.opensearch.action.search.SearchResponse;
41+
import org.opensearch.action.support.WriteRequest;
42+
import org.opensearch.cluster.metadata.IndexMetadata;
4143
import org.opensearch.common.settings.Settings;
4244
import org.opensearch.common.unit.TimeValue;
4345
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude;
@@ -56,6 +58,8 @@
5658
import java.util.List;
5759

5860
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
61+
import static org.opensearch.search.aggregations.AggregationBuilders.global;
62+
import static org.opensearch.search.aggregations.AggregationBuilders.stats;
5963
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
6064
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
6165
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
@@ -164,4 +168,23 @@ private void runLargeStringAggregationTest(AggregationBuilder aggregation) {
164168
}
165169
assertTrue("Exception should have been thrown", exceptionThrown);
166170
}
171+
172+
public void testAggsOnEmptyShards() {
173+
// Create index with 5 shards but only 1 doc
174+
assertAcked(
175+
prepareCreate(
176+
"idx",
177+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
178+
).setMapping("score", "type=integer")
179+
);
180+
client().prepareIndex("idx").setId("1").setSource("score", "5").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
181+
182+
// Validate global agg does not throw an exception
183+
assertSearchResponse(
184+
client().prepareSearch("idx").addAggregation(global("global").subAggregation(stats("value_stats").field("score"))).get()
185+
);
186+
187+
// Validate non-global agg does not throw an exception
188+
assertSearchResponse(client().prepareSearch("idx").addAggregation(stats("value_stats").field("score")).get());
189+
}
167190
}

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collection;
2727
import java.util.List;
2828

29+
import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
2930
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
3031
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3132
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
@@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception {
5051
assertAcked(
5152
prepareCreate(
5253
"idx",
53-
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
Settings.builder()
55+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
56+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
57+
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
5458
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
5559
);
5660
waitForRelocation(ClusterHealthStatus.GREEN);
5761

58-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
59-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
60-
refresh("idx");
61-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
62-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
63-
refresh("idx");
64-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
65-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
66-
refresh("idx");
67-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
68-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
69-
refresh("idx");
62+
indexRandom(
63+
true,
64+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
65+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
66+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
67+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
68+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
69+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
70+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
71+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
72+
);
7073

7174
waitForRelocation(ClusterHealthStatus.GREEN);
7275
refresh();

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.opensearch.search.query.ReduceableSearchResult;
1616

1717
import java.io.IOException;
18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.List;
20+
import java.util.Objects;
2121

2222
/**
2323
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
@@ -56,17 +56,9 @@ public String getCollectorReason() {
5656

5757
@Override
5858
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
59-
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
60-
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
59+
final List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
60+
assert internals.stream().noneMatch(Objects::isNull);
6161
context.aggregations().resetBucketMultiConsumer();
62-
for (Aggregator aggregator : aggregators) {
63-
try {
64-
// post collection is called in ContextIndexSearcher after search on leaves are completed
65-
internals.add(aggregator.buildTopLevel());
66-
} catch (IOException e) {
67-
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
68-
}
69-
}
7062

7163
// PipelineTreeSource is serialized to the coordinators on older OpenSearch versions for bwc but is deprecated in latest release
7264
// To handle that we need to add it in the InternalAggregations object sent in QuerySearchResult.

server/src/main/java/org/opensearch/search/aggregations/Aggregator.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.search.aggregations;
3434

3535
import org.opensearch.OpenSearchParseException;
36+
import org.opensearch.common.SetOnce;
3637
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.common.lease.Releasable;
3839
import org.opensearch.core.ParseField;
@@ -61,6 +62,8 @@
6162
@PublicApi(since = "1.0.0")
6263
public abstract class Aggregator extends BucketCollector implements Releasable {
6364

65+
private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();
66+
6467
/**
6568
* Parses the aggregation request and creates the appropriate aggregator factory for it.
6669
*
@@ -83,6 +86,13 @@ public interface Parser {
8386
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
8487
}
8588

89+
/**
90+
* Returns the InternalAggregation stored during post collection
91+
*/
92+
public InternalAggregation getPostCollectionAggregation() {
93+
return internalAggregation.get();
94+
}
95+
8696
/**
8797
* Return the name of this aggregator.
8898
*/
@@ -185,13 +195,15 @@ public interface BucketComparator {
185195

186196
/**
187197
* Build the result of this aggregation if it is at the "top level"
188-
* of the aggregation tree. If, instead, it is a sub-aggregation of
189-
* another aggregation then the aggregation that contains it will call
190-
* {@link #buildAggregations(long[])}.
198+
* of the aggregation tree and save it. This should get called
199+
* during post collection. If, instead, it is a sub-aggregation
200+
* of another aggregation then the aggregation that contains
201+
* it will call {@link #buildAggregations(long[])}.
191202
*/
192203
public final InternalAggregation buildTopLevel() throws IOException {
193204
assert parent() == null;
194-
return buildAggregations(new long[] { 0 })[0];
205+
this.internalAggregation.set(buildAggregations(new long[] { 0 })[0]);
206+
return internalAggregation.get();
195207
}
196208

197209
/**

server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java

+36
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ public void processPostCollection(Collector collectorTree) throws IOException {
7272
}
7373
} else if (currentCollector instanceof BucketCollector) {
7474
((BucketCollector) currentCollector).postCollection();
75+
76+
// Perform build aggregation during post collection
77+
if (currentCollector instanceof Aggregator) {
78+
((Aggregator) currentCollector).buildTopLevel();
79+
} else if (currentCollector instanceof MultiBucketCollector) {
80+
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
81+
collectors.offer(innerCollector);
82+
}
83+
}
7584
}
7685
}
7786
}
@@ -106,4 +115,31 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
106115
}
107116
return aggregators;
108117
}
118+
119+
/**
120+
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
121+
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
122+
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
123+
* returning response to coordinator
124+
* @param collectors collection of aggregation collectors to reduce
125+
* @return list of unwrapped {@link InternalAggregation}
126+
*/
127+
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
128+
List<InternalAggregation> internalAggregations = new ArrayList<>();
129+
130+
final Deque<Collector> allCollectors = new LinkedList<>(collectors);
131+
while (!allCollectors.isEmpty()) {
132+
Collector currentCollector = allCollectors.pop();
133+
if (currentCollector instanceof InternalProfileCollector) {
134+
currentCollector = ((InternalProfileCollector) currentCollector).getCollector();
135+
}
136+
137+
if (currentCollector instanceof Aggregator) {
138+
internalAggregations.add(((Aggregator) currentCollector).getPostCollectionAggregation());
139+
} else if (currentCollector instanceof MultiBucketCollector) {
140+
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
141+
}
142+
}
143+
return internalAggregations;
144+
}
109145
}

server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManager.java

+15
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.apache.lucene.search.CollectorManager;
1313
import org.opensearch.search.internal.SearchContext;
1414
import org.opensearch.search.profile.query.CollectorResult;
15+
import org.opensearch.search.query.ReduceableSearchResult;
1516

1617
import java.io.IOException;
18+
import java.util.Collection;
1719
import java.util.Collections;
1820
import java.util.Objects;
1921

@@ -42,6 +44,19 @@ public Collector newCollector() throws IOException {
4244
}
4345
}
4446

47+
@Override
48+
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
49+
// If there are no leaves then in concurrent search case postCollection, and subsequently buildAggregation, will not be called in
50+
// search path. Since we build the InternalAggregation in postCollection that will not get created in such cases either. Therefore
51+
// we need to manually processPostCollection here to build empty InternalAggregation objects for this collector tree.
52+
if (context.searcher().getLeafContexts().isEmpty()) {
53+
for (Collector c : collectors) {
54+
context.bucketCollectorProcessor().processPostCollection(c);
55+
}
56+
}
57+
return super.reduce(collectors);
58+
}
59+
4560
@Override
4661
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
4762
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices

server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,10 @@ public static class MultiBucketConsumer implements IntConsumer {
131131
private final int limit;
132132
private final CircuitBreaker breaker;
133133

134-
// aggregations execute in a single thread for both sequential
135-
// and concurrent search, so no atomic here
134+
// count is currently only updated in final reduce phase which is executed in single thread for both concurrent and non-concurrent
135+
// search
136136
private int count;
137-
138-
// will be updated by multiple threads in concurrent search
139-
// hence making it as LongAdder
137+
// will be updated by multiple threads in concurrent search hence making it as LongAdder
140138
private final LongAdder callCount;
141139
private volatile boolean circuitBreakerTripped;
142140
private final int availProcessors;

server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManager.java

+15
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.apache.lucene.search.CollectorManager;
1313
import org.opensearch.search.internal.SearchContext;
1414
import org.opensearch.search.profile.query.CollectorResult;
15+
import org.opensearch.search.query.ReduceableSearchResult;
1516

1617
import java.io.IOException;
18+
import java.util.Collection;
1719
import java.util.Collections;
1820
import java.util.Objects;
1921

@@ -42,6 +44,19 @@ public Collector newCollector() throws IOException {
4244
}
4345
}
4446

47+
@Override
48+
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
49+
// If there are no leaves then in concurrent search case postCollection, and subsequently buildAggregation, will not be called in
50+
// search path. Since we build the InternalAggregation in postCollection that will not get created in such cases either. Therefore
51+
// we need to manually processPostCollection here to build empty InternalAggregation objects for this collector tree.
52+
if (context.searcher().getLeafContexts().isEmpty()) {
53+
for (Collector c : collectors) {
54+
context.bucketCollectorProcessor().processPostCollection(c);
55+
}
56+
}
57+
return super.reduce(collectors);
58+
}
59+
4560
@Override
4661
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
4762
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.search.internal.SearchContext;
4141

4242
import java.io.IOException;
43+
import java.util.Arrays;
4344
import java.util.Map;
4445

4546
/**
@@ -80,7 +81,7 @@ protected Aggregator createInternal(
8081

8182
@Override
8283
protected boolean supportsConcurrentSegmentSearch() {
83-
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
84-
return false;
84+
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
85+
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
8586
}
8687
}

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

+24-5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.lucene.util.ArrayUtil;
4646
import org.apache.lucene.util.BytesRef;
4747
import org.apache.lucene.util.PriorityQueue;
48+
import org.opensearch.common.SetOnce;
4849
import org.opensearch.common.lease.Releasable;
4950
import org.opensearch.common.lease.Releasables;
5051
import org.opensearch.common.util.LongArray;
@@ -94,8 +95,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
9495
private final long valueCount;
9596
private final String fieldName;
9697
private Weight weight;
97-
private final GlobalOrdLookupFunction lookupGlobalOrd;
9898
protected final CollectionStrategy collectionStrategy;
99+
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
99100
protected int segmentsWithSingleValuedOrds = 0;
100101
protected int segmentsWithMultiValuedOrds = 0;
101102

@@ -129,11 +130,10 @@ public GlobalOrdinalsStringTermsAggregator(
129130
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
130131
this.valuesSource = valuesSource;
131132
final IndexReader reader = context.searcher().getIndexReader();
132-
final SortedSetDocValues values = reader.leaves().size() > 0
133+
final SortedSetDocValues values = !reader.leaves().isEmpty()
133134
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
134135
: DocValues.emptySortedSet();
135136
this.valueCount = values.getValueCount();
136-
this.lookupGlobalOrd = values::lookupOrd;
137137
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
138138
if (remapGlobalOrds) {
139139
this.collectionStrategy = new RemapGlobalOrds(cardinality);
@@ -885,7 +885,10 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
885885
}
886886

887887
StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
888-
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
888+
// Recreate DocValues as needed for concurrent segment search
889+
SortedSetDocValues values = getDocValues();
890+
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));
891+
889892
StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
890893
result.bucketOrd = temp.bucketOrd;
891894
result.docCountError = 0;
@@ -1001,7 +1004,9 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
10011004
long subsetSize = subsetSize(owningBucketOrd);
10021005
return (spare, globalOrd, bucketOrd, docCount) -> {
10031006
spare.bucketOrd = bucketOrd;
1004-
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
1007+
// Recreate DocValues as needed for concurrent segment search
1008+
SortedSetDocValues values = getDocValues();
1009+
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
10051010
spare.subsetDf = docCount;
10061011
spare.subsetSize = subsetSize;
10071012
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
@@ -1086,4 +1091,18 @@ private void oversizedCopy(BytesRef from, BytesRef to) {
10861091
* Predicate used for {@link #acceptedGlobalOrdinals} if there is no filter.
10871092
*/
10881093
private static final LongPredicate ALWAYS_TRUE = l -> true;
1094+
1095+
/**
1096+
* If DocValues have not been initialized yet for reduce phase, create and set them.
1097+
*/
1098+
private SortedSetDocValues getDocValues() throws IOException {
1099+
if (dvs.get() == null) {
1100+
dvs.set(
1101+
!context.searcher().getIndexReader().leaves().isEmpty()
1102+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
1103+
: DocValues.emptySortedSet()
1104+
);
1105+
}
1106+
return dvs.get();
1107+
}
10891108
}

0 commit comments

Comments
 (0)