Skip to content

Commit ecf87ba

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Perform buildAggregation concurrently and support Composite Aggregations
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 1f5df54 commit ecf87ba

File tree

11 files changed

+151
-37
lines changed

11 files changed

+151
-37
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
117117
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
118118
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
119119
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
120+
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
120121

121122
### Dependencies
122123
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1212

13+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
1314
import org.opensearch.action.search.SearchResponse;
1415
import org.opensearch.cluster.health.ClusterHealthStatus;
1516
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -26,6 +27,7 @@
2627
import java.util.Collection;
2728
import java.util.List;
2829

30+
import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
2931
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
3032
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3133
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
@@ -50,26 +52,31 @@ public void setupSuiteScopeCluster() throws Exception {
5052
assertAcked(
5153
prepareCreate(
5254
"idx",
53-
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
55+
Settings.builder()
56+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
57+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
58+
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
5459
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
5560
);
5661
waitForRelocation(ClusterHealthStatus.GREEN);
5762

58-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
59-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
60-
refresh("idx");
61-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
62-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
63-
refresh("idx");
64-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
65-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
66-
refresh("idx");
67-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
68-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
69-
refresh("idx");
63+
indexRandom(
64+
true,
65+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
66+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
67+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
68+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
69+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
70+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
71+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
72+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
73+
);
7074

7175
waitForRelocation(ClusterHealthStatus.GREEN);
7276
refresh();
77+
78+
IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx").get();
79+
System.out.println("Segments: " + segmentResponse.getIndices().get("idx").getShards().get(0).getShards()[0].getSegments().size());
7380
}
7481

7582
public void testCompositeAggWithNoSubAgg() {

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

+3-11
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.opensearch.search.query.ReduceableSearchResult;
1616

1717
import java.io.IOException;
18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.List;
20+
import java.util.Objects;
2121

2222
/**
2323
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
@@ -56,17 +56,9 @@ public String getCollectorReason() {
5656

5757
@Override
5858
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
59-
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
60-
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
59+
List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
60+
assert !internals.stream().anyMatch(Objects::isNull);
6161
context.aggregations().resetBucketMultiConsumer();
62-
for (Aggregator aggregator : aggregators) {
63-
try {
64-
// post collection is called in ContextIndexSearcher after search on leaves are completed
65-
internals.add(aggregator.buildTopLevel());
66-
} catch (IOException e) {
67-
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
68-
}
69-
}
7062

7163
final InternalAggregations internalAggregations = InternalAggregations.from(internals);
7264
return buildAggregationResult(internalAggregations);

server/src/main/java/org/opensearch/search/aggregations/Aggregator.java

+11
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.search.aggregations;
3434

3535
import org.opensearch.OpenSearchParseException;
36+
import org.opensearch.common.SetOnce;
3637
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.common.lease.Releasable;
3839
import org.opensearch.core.ParseField;
@@ -61,6 +62,8 @@
6162
@PublicApi(since = "1.0.0")
6263
public abstract class Aggregator extends BucketCollector implements Releasable {
6364

65+
final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();
66+
6467
/**
6568
* Parses the aggregation request and creates the appropriate aggregator factory for it.
6669
*
@@ -83,6 +86,14 @@ public interface Parser {
8386
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
8487
}
8588

89+
public void setInternalAggregation(InternalAggregation internalAggregation) {
90+
this.internalAggregation.set(internalAggregation);
91+
}
92+
93+
public InternalAggregation getInternalAggregation() {
94+
return internalAggregation.get();
95+
}
96+
8697
/**
8798
* Return the name of this aggregator.
8899
*/

server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java

+7
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,13 @@ public void postCollection() throws IOException {
279279
collectableSubAggregators.postCollection();
280280
}
281281

282+
public void buildAndSetInternalAggregation() throws IOException {
283+
// Only call buildTopLevel for top level aggregators. This will subsequently build aggregations for child aggs.
284+
if (parent == null) {
285+
setInternalAggregation(buildTopLevel());
286+
}
287+
}
288+
282289
/** Called upon release of the aggregator. */
283290
@Override
284291
public void close() {

server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java

+76
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.annotation.PublicApi;
1414
import org.opensearch.common.lucene.MinimumScoreCollector;
1515
import org.opensearch.search.internal.SearchContext;
16+
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
1617
import org.opensearch.search.profile.query.InternalProfileCollector;
1718

1819
import java.io.IOException;
@@ -22,6 +23,7 @@
2223
import java.util.Deque;
2324
import java.util.LinkedList;
2425
import java.util.List;
26+
import java.util.Objects;
2527
import java.util.Queue;
2628

2729
/**
@@ -63,6 +65,7 @@ public void processPostCollection(Collector collectorTree) throws IOException {
6365
while (!collectors.isEmpty()) {
6466
Collector currentCollector = collectors.poll();
6567
if (currentCollector instanceof InternalProfileCollector) {
68+
// Profile collector should be the top level one so we should be able to call buildAggregation on it here
6669
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
6770
} else if (currentCollector instanceof MinimumScoreCollector) {
6871
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
@@ -72,6 +75,19 @@ public void processPostCollection(Collector collectorTree) throws IOException {
7275
}
7376
} else if (currentCollector instanceof BucketCollector) {
7477
((BucketCollector) currentCollector).postCollection();
78+
79+
// Perform build aggregation during post collection
80+
if (currentCollector instanceof AggregatorBase) {
81+
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
82+
} else if (currentCollector instanceof ProfilingAggregator) {
83+
((ProfilingAggregator) currentCollector).setInternalAggregation(
84+
((ProfilingAggregator) currentCollector).buildTopLevel()
85+
);
86+
} else if (currentCollector instanceof MultiBucketCollector) {
87+
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
88+
collectors.offer(innerCollector);
89+
}
90+
}
7591
}
7692
}
7793
}
@@ -106,4 +122,64 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
106122
}
107123
return aggregators;
108124
}
125+
126+
/**
127+
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
128+
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
129+
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
130+
* returning response to coordinator
131+
* @param collectors collection of aggregation collectors to reduce
132+
* @return list of unwrapped {@link InternalAggregation}
133+
*/
134+
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
135+
List<InternalAggregation> internalAggregations = new ArrayList<>();
136+
137+
final Deque<Collector> allCollectors = new LinkedList<>(collectors);
138+
while (!allCollectors.isEmpty()) {
139+
final Collector currentCollector = allCollectors.pop();
140+
if (currentCollector instanceof AggregatorBase) {
141+
internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation());
142+
} else if (currentCollector instanceof ProfilingAggregator) {
143+
internalAggregations.add(((ProfilingAggregator) currentCollector).getInternalAggregation());
144+
} else if (currentCollector instanceof InternalProfileCollector) {
145+
if (((InternalProfileCollector) currentCollector).getCollector() instanceof AggregatorBase) {
146+
internalAggregations.add(
147+
((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation()
148+
);
149+
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
150+
allCollectors.addAll(
151+
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
152+
);
153+
}
154+
} else if (currentCollector instanceof MultiBucketCollector) {
155+
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
156+
}
157+
}
158+
159+
// Check that internalAggregations does not contain any null objects as that means postCollection was not called for a given
160+
// collector. This can happen as collect will not get called whenever there are no leaves on a shard. Since we build the
161+
// InternalAggregation in postCollection that will not get called in such cases either. Therefore we need to manually call it again
162+
// here to build empty InternalAggregation objects for this collector tree.
163+
if (internalAggregations.stream().anyMatch(Objects::isNull)) {
164+
allCollectors.addAll(collectors);
165+
while (!allCollectors.isEmpty()) {
166+
final Collector currentCollector = allCollectors.pop();
167+
if (currentCollector instanceof AggregatorBase) {
168+
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
169+
} else if (currentCollector instanceof ProfilingAggregator) {
170+
((ProfilingAggregator) currentCollector).setInternalAggregation(
171+
((ProfilingAggregator) currentCollector).buildTopLevel()
172+
);
173+
} else if (currentCollector instanceof MultiBucketCollector) {
174+
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
175+
allCollectors.offer(innerCollector);
176+
}
177+
}
178+
}
179+
// Iterate through collector tree again to get InternalAggregations object
180+
return toInternalAggregations(collectors);
181+
} else {
182+
return internalAggregations;
183+
}
184+
}
109185
}

server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public static class MultiBucketConsumer implements IntConsumer {
133133

134134
// aggregations execute in a single thread for both sequential
135135
// and concurrent search, so no atomic here
136-
private int count;
136+
private final LongAdder count;
137137

138138
// will be updated by multiple threads in concurrent search
139139
// hence making it as LongAdder
@@ -145,6 +145,7 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
145145
this.limit = limit;
146146
this.breaker = breaker;
147147
callCount = new LongAdder();
148+
count = new LongAdder();
148149
availProcessors = Runtime.getRuntime().availableProcessors();
149150
}
150151

@@ -158,6 +159,7 @@ protected MultiBucketConsumer(
158159
) {
159160
this.limit = limit;
160161
this.breaker = breaker;
162+
this.count = new LongAdder();
161163
this.callCount = callCount;
162164
this.circuitBreakerTripped = circuitBreakerTripped;
163165
this.availProcessors = availProcessors;
@@ -166,8 +168,9 @@ protected MultiBucketConsumer(
166168
@Override
167169
public void accept(int value) {
168170
if (value != 0) {
169-
count += value;
170-
if (count > limit) {
171+
count.add(value);
172+
;
173+
if (count.intValue() > limit) {
171174
throw new TooManyBucketsException(
172175
"Trying to create too many buckets. Must be less than or equal to: ["
173176
+ limit
@@ -205,11 +208,11 @@ public void accept(int value) {
205208
}
206209

207210
public void reset() {
208-
this.count = 0;
211+
this.count.reset();
209212
}
210213

211214
public int getCount() {
212-
return count;
215+
return count.intValue();
213216
}
214217

215218
public int getLimit() {

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ protected Aggregator createInternal(
8080

8181
@Override
8282
protected boolean supportsConcurrentSegmentSearch() {
83-
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
84-
return false;
83+
return true;
8584
}
8685
}

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
9494
private final long valueCount;
9595
private final String fieldName;
9696
private Weight weight;
97-
private final GlobalOrdLookupFunction lookupGlobalOrd;
9897
protected final CollectionStrategy collectionStrategy;
9998
protected int segmentsWithSingleValuedOrds = 0;
10099
protected int segmentsWithMultiValuedOrds = 0;
@@ -129,11 +128,10 @@ public GlobalOrdinalsStringTermsAggregator(
129128
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
130129
this.valuesSource = valuesSource;
131130
final IndexReader reader = context.searcher().getIndexReader();
132-
final SortedSetDocValues values = reader.leaves().size() > 0
131+
final SortedSetDocValues values = !reader.leaves().isEmpty()
133132
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
134133
: DocValues.emptySortedSet();
135134
this.valueCount = values.getValueCount();
136-
this.lookupGlobalOrd = values::lookupOrd;
137135
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
138136
if (remapGlobalOrds) {
139137
this.collectionStrategy = new RemapGlobalOrds(cardinality);
@@ -885,7 +883,12 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
885883
}
886884

887885
StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
888-
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
886+
// Recreate DocValues as needed for concurrent segment search
887+
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
888+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
889+
: DocValues.emptySortedSet();
890+
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));
891+
889892
StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
890893
result.bucketOrd = temp.bucketOrd;
891894
result.docCountError = 0;
@@ -1001,7 +1004,11 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
10011004
long subsetSize = subsetSize(owningBucketOrd);
10021005
return (spare, globalOrd, bucketOrd, docCount) -> {
10031006
spare.bucketOrd = bucketOrd;
1004-
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
1007+
// Recreate DocValues as needed for concurrent segment search
1008+
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
1009+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
1010+
: DocValues.emptySortedSet();
1011+
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
10051012
spare.subsetDf = docCount;
10061013
spare.subsetSize = subsetSize;
10071014
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);

server/src/main/java/org/opensearch/search/internal/SearchContext.java

+6
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
113113
// should not be called when there is no aggregation collector
114114
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
115115
}
116+
117+
@Override
118+
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
119+
// should not be called when there is no aggregation collector
120+
throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
121+
}
116122
};
117123

118124
private final List<Releasable> releasables = new CopyOnWriteArrayList<>();

0 commit comments

Comments
 (0)