Skip to content

Commit df4bf1c

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Perform buildAggregation concurrently and support Composite Aggregations
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent d4e1ab1 commit df4bf1c

File tree

10 files changed

+135
-47
lines changed

10 files changed

+135
-47
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
118118
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
119119
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
120120
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
121+
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
121122

122123
### Dependencies
123124
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Collection;
2727
import java.util.List;
2828

29+
import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
2930
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
3031
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3132
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
@@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception {
5051
assertAcked(
5152
prepareCreate(
5253
"idx",
53-
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
54+
Settings.builder()
55+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
56+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
57+
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
5458
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
5559
);
5660
waitForRelocation(ClusterHealthStatus.GREEN);
5761

58-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
59-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
60-
refresh("idx");
61-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
62-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
63-
refresh("idx");
64-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
65-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
66-
refresh("idx");
67-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
68-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
69-
refresh("idx");
62+
indexRandom(
63+
true,
64+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
65+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
66+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
67+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
68+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
69+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
70+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
71+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
72+
);
7073

7174
waitForRelocation(ClusterHealthStatus.GREEN);
7275
refresh();

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.opensearch.search.query.ReduceableSearchResult;
1616

1717
import java.io.IOException;
18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.List;
20+
import java.util.Objects;
2121

2222
/**
2323
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
@@ -56,17 +56,9 @@ public String getCollectorReason() {
5656

5757
@Override
5858
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
59-
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
60-
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
59+
final List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
60+
assert internals.stream().noneMatch(Objects::isNull);
6161
context.aggregations().resetBucketMultiConsumer();
62-
for (Aggregator aggregator : aggregators) {
63-
try {
64-
// post collection is called in ContextIndexSearcher after search on leaves are completed
65-
internals.add(aggregator.buildTopLevel());
66-
} catch (IOException e) {
67-
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
68-
}
69-
}
7062

7163
final InternalAggregations internalAggregations = InternalAggregations.from(internals);
7264
return buildAggregationResult(internalAggregations);

server/src/main/java/org/opensearch/search/aggregations/Aggregator.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.search.aggregations;
3434

3535
import org.opensearch.OpenSearchParseException;
36+
import org.opensearch.common.SetOnce;
3637
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.common.lease.Releasable;
3839
import org.opensearch.core.ParseField;
@@ -61,6 +62,8 @@
6162
@PublicApi(since = "1.0.0")
6263
public abstract class Aggregator extends BucketCollector implements Releasable {
6364

65+
private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();
66+
6467
/**
6568
* Parses the aggregation request and creates the appropriate aggregator factory for it.
6669
*
@@ -83,6 +86,13 @@ public interface Parser {
8386
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
8487
}
8588

89+
/**
90+
* Returns the InternalAggregation stored during post collection
91+
*/
92+
public InternalAggregation getPostCollectionAggregation() {
93+
return internalAggregation.get();
94+
}
95+
8696
/**
8797
* Return the name of this aggregator.
8898
*/
@@ -184,14 +194,12 @@ public interface BucketComparator {
184194
public abstract InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException;
185195

186196
/**
187-
* Build the result of this aggregation if it is at the "top level"
188-
* of the aggregation tree. If, instead, it is a sub-aggregation of
189-
* another aggregation then the aggregation that contains it will call
190-
* {@link #buildAggregations(long[])}.
197+
* Build the result of this aggregation if it is at the "top level" of the aggregation tree and save it. This should get called during post collection. If, instead, it is a sub-aggregation of another aggregation then the aggregation that contains it will call {@link #buildAggregations(long[])}.
191198
*/
192199
public final InternalAggregation buildTopLevel() throws IOException {
193200
assert parent() == null;
194-
return buildAggregations(new long[] { 0 })[0];
201+
this.internalAggregation.set(buildAggregations(new long[] { 0 })[0]);
202+
return internalAggregation.get();
195203
}
196204

197205
/**

server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java

+67
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.annotation.PublicApi;
1414
import org.opensearch.common.lucene.MinimumScoreCollector;
1515
import org.opensearch.search.internal.SearchContext;
16+
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
1617
import org.opensearch.search.profile.query.InternalProfileCollector;
1718

1819
import java.io.IOException;
@@ -22,6 +23,7 @@
2223
import java.util.Deque;
2324
import java.util.LinkedList;
2425
import java.util.List;
26+
import java.util.Objects;
2527
import java.util.Queue;
2628

2729
/**
@@ -72,6 +74,15 @@ public void processPostCollection(Collector collectorTree) throws IOException {
7274
}
7375
} else if (currentCollector instanceof BucketCollector) {
7476
((BucketCollector) currentCollector).postCollection();
77+
78+
// Perform build aggregation during post collection
79+
if (currentCollector instanceof Aggregator) {
80+
((Aggregator) currentCollector).buildTopLevel();
81+
} else if (currentCollector instanceof MultiBucketCollector) {
82+
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
83+
collectors.offer(innerCollector);
84+
}
85+
}
7586
}
7687
}
7788
}
@@ -106,4 +117,60 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
106117
}
107118
return aggregators;
108119
}
120+
121+
/**
122+
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
123+
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
124+
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
125+
* returning response to coordinator
126+
* @param collectors collection of aggregation collectors to reduce
127+
* @return list of unwrapped {@link InternalAggregation}
128+
*/
129+
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
130+
List<InternalAggregation> internalAggregations = new ArrayList<>();
131+
132+
final Deque<Collector> allCollectors = new LinkedList<>(collectors);
133+
while (!allCollectors.isEmpty()) {
134+
final Collector currentCollector = allCollectors.pop();
135+
if (currentCollector instanceof AggregatorBase) {
136+
internalAggregations.add(((AggregatorBase) currentCollector).getPostCollectionAggregation());
137+
} else if (currentCollector instanceof ProfilingAggregator) {
138+
internalAggregations.add(((ProfilingAggregator) currentCollector).getPostCollectionAggregation());
139+
} else if (currentCollector instanceof InternalProfileCollector) {
140+
if (((InternalProfileCollector) currentCollector).getCollector() instanceof AggregatorBase) {
141+
internalAggregations.add(
142+
((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getPostCollectionAggregation()
143+
);
144+
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
145+
allCollectors.addAll(
146+
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
147+
);
148+
}
149+
} else if (currentCollector instanceof MultiBucketCollector) {
150+
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
151+
}
152+
}
153+
154+
// Check that internalAggregations does not contain any null objects as that means postCollection was not called for a given
155+
// collector. This can happen as collect will not get called whenever there are no leaves on a shard. Since we build the
156+
// InternalAggregation in postCollection that will not get called in such cases either. Therefore we need to manually call it again
157+
// here to build empty InternalAggregation objects for this collector tree.
158+
if (internalAggregations.stream().anyMatch(Objects::isNull)) {
159+
allCollectors.addAll(collectors);
160+
while (!allCollectors.isEmpty()) {
161+
final Collector currentCollector = allCollectors.pop();
162+
if (currentCollector instanceof Aggregator) {
163+
((Aggregator) currentCollector).buildTopLevel();
164+
} else if (currentCollector instanceof MultiBucketCollector) {
165+
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
166+
allCollectors.offer(innerCollector);
167+
}
168+
}
169+
}
170+
// Iterate through collector tree again to get InternalAggregations object
171+
return toInternalAggregations(collectors);
172+
} else {
173+
return internalAggregations;
174+
}
175+
}
109176
}

server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java

+8-10
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,8 @@ public static class MultiBucketConsumer implements IntConsumer {
131131
private final int limit;
132132
private final CircuitBreaker breaker;
133133

134-
// aggregations execute in a single thread for both sequential
135-
// and concurrent search, so no atomic here
136-
private int count;
137-
138-
// will be updated by multiple threads in concurrent search
139-
// hence making it as LongAdder
134+
// count and callCount will both be updated by multiple threads in concurrent segment search hence using LongAdder
135+
private final LongAdder count;
140136
private final LongAdder callCount;
141137
private volatile boolean circuitBreakerTripped;
142138
private final int availProcessors;
@@ -145,6 +141,7 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
145141
this.limit = limit;
146142
this.breaker = breaker;
147143
callCount = new LongAdder();
144+
count = new LongAdder();
148145
availProcessors = Runtime.getRuntime().availableProcessors();
149146
}
150147

@@ -158,6 +155,7 @@ protected MultiBucketConsumer(
158155
) {
159156
this.limit = limit;
160157
this.breaker = breaker;
158+
this.count = new LongAdder();
161159
this.callCount = callCount;
162160
this.circuitBreakerTripped = circuitBreakerTripped;
163161
this.availProcessors = availProcessors;
@@ -166,8 +164,8 @@ protected MultiBucketConsumer(
166164
@Override
167165
public void accept(int value) {
168166
if (value != 0) {
169-
count += value;
170-
if (count > limit) {
167+
count.add(value);
168+
if (count.intValue() > limit) {
171169
throw new TooManyBucketsException(
172170
"Trying to create too many buckets. Must be less than or equal to: ["
173171
+ limit
@@ -205,11 +203,11 @@ public void accept(int value) {
205203
}
206204

207205
public void reset() {
208-
this.count = 0;
206+
this.count.reset();
209207
}
210208

211209
public int getCount() {
212-
return count;
210+
return count.intValue();
213211
}
214212

215213
public int getLimit() {

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.search.internal.SearchContext;
4141

4242
import java.io.IOException;
43+
import java.util.Arrays;
4344
import java.util.Map;
4445

4546
/**
@@ -80,7 +81,7 @@ protected Aggregator createInternal(
8081

8182
@Override
8283
protected boolean supportsConcurrentSegmentSearch() {
83-
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
84-
return false;
84+
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
85+
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
8586
}
8687
}

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
9494
private final long valueCount;
9595
private final String fieldName;
9696
private Weight weight;
97-
private final GlobalOrdLookupFunction lookupGlobalOrd;
9897
protected final CollectionStrategy collectionStrategy;
9998
protected int segmentsWithSingleValuedOrds = 0;
10099
protected int segmentsWithMultiValuedOrds = 0;
@@ -129,11 +128,10 @@ public GlobalOrdinalsStringTermsAggregator(
129128
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
130129
this.valuesSource = valuesSource;
131130
final IndexReader reader = context.searcher().getIndexReader();
132-
final SortedSetDocValues values = reader.leaves().size() > 0
131+
final SortedSetDocValues values = !reader.leaves().isEmpty()
133132
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
134133
: DocValues.emptySortedSet();
135134
this.valueCount = values.getValueCount();
136-
this.lookupGlobalOrd = values::lookupOrd;
137135
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
138136
if (remapGlobalOrds) {
139137
this.collectionStrategy = new RemapGlobalOrds(cardinality);
@@ -885,7 +883,12 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
885883
}
886884

887885
StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
888-
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
886+
// Recreate DocValues as needed for concurrent segment search
887+
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
888+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
889+
: DocValues.emptySortedSet();
890+
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));
891+
889892
StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
890893
result.bucketOrd = temp.bucketOrd;
891894
result.docCountError = 0;
@@ -1001,7 +1004,11 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
10011004
long subsetSize = subsetSize(owningBucketOrd);
10021005
return (spare, globalOrd, bucketOrd, docCount) -> {
10031006
spare.bucketOrd = bucketOrd;
1004-
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
1007+
// Recreate DocValues as needed for concurrent segment search
1008+
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
1009+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
1010+
: DocValues.emptySortedSet();
1011+
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
10051012
spare.subsetDf = docCount;
10061013
spare.subsetSize = subsetSize;
10071014
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);

server/src/main/java/org/opensearch/search/internal/SearchContext.java

+6
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
113113
// should not be called when there is no aggregation collector
114114
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
115115
}
116+
117+
@Override
118+
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
119+
// should not be called when there is no aggregation collector
120+
throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
121+
}
116122
};
117123

118124
private final List<Releasable> releasables = new CopyOnWriteArrayList<>();

test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,12 @@
211211
"LuceneFixedGap",
212212
"LuceneVarGapFixedInterval",
213213
"LuceneVarGapDocFreqInterval",
214-
"Lucene50" })
214+
"Lucene50",
215+
"Lucene90",
216+
"Lucene94",
217+
"Lucene90",
218+
"Lucene95",
219+
"Lucene99" })
215220
@LuceneTestCase.SuppressReproduceLine
216221
public abstract class OpenSearchTestCase extends LuceneTestCase {
217222

0 commit comments

Comments
 (0)