Skip to content

Commit 2f3f541

Browse files
author
Jay Deng
committed
Parallelize build agg
1 parent 1f5df54 commit 2f3f541

File tree

11 files changed

+159
-29
lines changed

11 files changed

+159
-29
lines changed

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1212

13+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
1314
import org.opensearch.action.search.SearchResponse;
1415
import org.opensearch.cluster.health.ClusterHealthStatus;
1516
import org.opensearch.cluster.metadata.IndexMetadata;
@@ -26,6 +27,7 @@
2627
import java.util.Collection;
2728
import java.util.List;
2829

30+
import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
2931
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
3032
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3133
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
@@ -50,26 +52,31 @@ public void setupSuiteScopeCluster() throws Exception {
5052
assertAcked(
5153
prepareCreate(
5254
"idx",
53-
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
55+
Settings.builder()
56+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
57+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
58+
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
5459
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
5560
);
5661
waitForRelocation(ClusterHealthStatus.GREEN);
5762

58-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
59-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
60-
refresh("idx");
61-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
62-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
63-
refresh("idx");
64-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
65-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
66-
refresh("idx");
67-
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
68-
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
69-
refresh("idx");
63+
indexRandom(
64+
true,
65+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
66+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
67+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
68+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
69+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
70+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
71+
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
72+
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
73+
);
7074

7175
waitForRelocation(ClusterHealthStatus.GREEN);
7276
refresh();
77+
78+
IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx").get();
79+
System.out.println("Segments: " + segmentResponse.getIndices().get("idx").getShards().get(0).getShards()[0].getSegments().size());
7380
}
7481

7582
public void testCompositeAggWithNoSubAgg() {

server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java

+34-10
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.opensearch.search.query.ReduceableSearchResult;
1616

1717
import java.io.IOException;
18-
import java.util.ArrayList;
1918
import java.util.Collection;
2019
import java.util.List;
20+
import java.util.Objects;
2121

2222
/**
2323
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
@@ -54,19 +54,43 @@ public String getCollectorReason() {
5454

5555
public abstract String getCollectorName();
5656

57+
// @Override
58+
// public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
59+
// final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
60+
// final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
61+
// context.aggregations().resetBucketMultiConsumer();
62+
// for (Aggregator aggregator : aggregators) {
63+
// try {
64+
// // post collection is called in ContextIndexSearcher after search on leaves are completed
65+
// internals.add(aggregator.buildTopLevel());
66+
// } catch (IOException e) {
67+
// throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
68+
// }
69+
// }
70+
//
71+
// final InternalAggregations internalAggregations = InternalAggregations.from(internals);
72+
// return buildAggregationResult(internalAggregations);
73+
// }
74+
5775
@Override
5876
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
59-
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
60-
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
61-
context.aggregations().resetBucketMultiConsumer();
62-
for (Aggregator aggregator : aggregators) {
63-
try {
64-
// post collection is called in ContextIndexSearcher after search on leaves are completed
65-
internals.add(aggregator.buildTopLevel());
66-
} catch (IOException e) {
67-
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
77+
List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
78+
79+
// collect does not get called whenever there are no leaves on a shard. Since we build the InternalAggregation in postCollection
80+
// that will not get called in such cases. Therefore we need to manually call it again here to build empty Internal Aggregation
81+
// objects for this collector tree.
82+
if (internals.stream().allMatch(Objects::isNull)) {
83+
List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
84+
for (Aggregator a : aggregators) {
85+
// // c could be a MultiBucketCollector
86+
if (a instanceof AggregatorBase) {
87+
((AggregatorBase) a).buildAndSetInternalAggregation();
88+
}
6889
}
90+
internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
6991
}
92+
assert !internals.stream().allMatch(Objects::isNull);
93+
context.aggregations().resetBucketMultiConsumer(); // Not sure if this is thread safe
7094

7195
final InternalAggregations internalAggregations = InternalAggregations.from(internals);
7296
return buildAggregationResult(internalAggregations);

server/src/main/java/org/opensearch/search/aggregations/Aggregator.java

+11
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
package org.opensearch.search.aggregations;
3434

3535
import org.opensearch.OpenSearchParseException;
36+
import org.opensearch.common.SetOnce;
3637
import org.opensearch.common.annotation.PublicApi;
3738
import org.opensearch.common.lease.Releasable;
3839
import org.opensearch.core.ParseField;
@@ -61,6 +62,8 @@
6162
@PublicApi(since = "1.0.0")
6263
public abstract class Aggregator extends BucketCollector implements Releasable {
6364

65+
private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();
66+
6467
/**
6568
* Parses the aggregation request and creates the appropriate aggregator factory for it.
6669
*
@@ -83,6 +86,14 @@ public interface Parser {
8386
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
8487
}
8588

89+
public void setInternalAggregation(InternalAggregation internalAggregation) {
90+
this.internalAggregation.set(internalAggregation);
91+
}
92+
93+
public InternalAggregation getInternalAggregation() {
94+
return internalAggregation.get();
95+
}
96+
8697
/**
8798
* Return the name of this aggregator.
8899
*/

server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.lucene.index.LeafReaderContext;
3535
import org.apache.lucene.search.MatchAllDocsQuery;
3636
import org.apache.lucene.search.ScoreMode;
37+
import org.opensearch.common.SetOnce;
3738
import org.opensearch.core.common.breaker.CircuitBreaker;
3839
import org.opensearch.core.common.breaker.CircuitBreakingException;
3940
import org.opensearch.core.indices.breaker.CircuitBreakerService;
@@ -72,6 +73,8 @@ public abstract class AggregatorBase extends Aggregator {
7273
private final CircuitBreakerService breakerService;
7374
private long requestBytesUsed;
7475

76+
private final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();
77+
7578
/**
7679
* Constructs a new Aggregator.
7780
*
@@ -279,6 +282,13 @@ public void postCollection() throws IOException {
279282
collectableSubAggregators.postCollection();
280283
}
281284

285+
public void buildAndSetInternalAggregation() throws IOException {
286+
// Only call buildTopLevel for top level aggregators. This will subsequently build aggregations for child aggs.
287+
if (parent == null) {
288+
internalAggregation.set(buildTopLevel());
289+
}
290+
}
291+
282292
/** Called upon release of the aggregator. */
283293
@Override
284294
public void close() {
@@ -305,6 +315,10 @@ protected final InternalAggregations buildEmptySubAggregations() {
305315
return InternalAggregations.from(aggs);
306316
}
307317

318+
public InternalAggregation getInternalAggregation() {
319+
return internalAggregation.get();
320+
}
321+
308322
@Override
309323
public String toString() {
310324
return name;

server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java

+43
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.common.annotation.PublicApi;
1414
import org.opensearch.common.lucene.MinimumScoreCollector;
1515
import org.opensearch.search.internal.SearchContext;
16+
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
1617
import org.opensearch.search.profile.query.InternalProfileCollector;
1718

1819
import java.io.IOException;
@@ -63,6 +64,7 @@ public void processPostCollection(Collector collectorTree) throws IOException {
6364
while (!collectors.isEmpty()) {
6465
Collector currentCollector = collectors.poll();
6566
if (currentCollector instanceof InternalProfileCollector) {
67+
// Profile collector should be the top level one so we should be able to call buildAggregation on it here
6668
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
6769
} else if (currentCollector instanceof MinimumScoreCollector) {
6870
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
@@ -72,6 +74,19 @@ public void processPostCollection(Collector collectorTree) throws IOException {
7274
}
7375
} else if (currentCollector instanceof BucketCollector) {
7476
((BucketCollector) currentCollector).postCollection();
77+
78+
// Call buildTopLevel here -- need to set the InternalAggregation in Aggregator because profiler extends that
79+
if (currentCollector instanceof AggregatorBase) {
80+
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
81+
} else if (currentCollector instanceof ProfilingAggregator) {
82+
((ProfilingAggregator) currentCollector).setInternalAggregation(
83+
((ProfilingAggregator) currentCollector).buildTopLevel()
84+
);
85+
} else if (currentCollector instanceof MultiBucketCollector) {
86+
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
87+
collectors.offer(innerCollector);
88+
}
89+
}
7590
}
7691
}
7792
}
@@ -106,4 +121,32 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
106121
}
107122
return aggregators;
108123
}
124+
125+
// The problem is here -- need to check for Aggregator class
126+
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
127+
List<InternalAggregation> internalAggregations = new ArrayList<>();
128+
129+
final Deque<Collector> allCollectors = new LinkedList<>(collectors);
130+
while (!allCollectors.isEmpty()) {
131+
final Collector currentCollector = allCollectors.pop();
132+
if (currentCollector instanceof AggregatorBase) {
133+
internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation());
134+
} else if (currentCollector instanceof ProfilingAggregator) {
135+
internalAggregations.add(((ProfilingAggregator) currentCollector).getInternalAggregation());
136+
} else if (currentCollector instanceof InternalProfileCollector) {
137+
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
138+
internalAggregations.add(
139+
((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation()
140+
);
141+
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
142+
allCollectors.addAll(
143+
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
144+
);
145+
}
146+
} else if (currentCollector instanceof MultiBucketCollector) {
147+
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
148+
}
149+
}
150+
return internalAggregations;
151+
}
109152
}

server/src/main/java/org/opensearch/search/aggregations/GlobalAggCollectorManagerWithSingleCollector.java

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public Collector newCollector() throws IOException {
4242
@Override
4343
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
4444
assert collectors.isEmpty() : "Reduce on GlobalAggregationCollectorManagerWithCollector called with non-empty collectors";
45+
// Something needs to be done here
4546
return super.reduce(List.of(collector));
4647
}
4748

server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java

+2
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ public Collector newCollector() throws IOException {
3939
return collector;
4040
}
4141

42+
// Can we add post collection logic in here?
4243
@Override
4344
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
4445
assert collectors.isEmpty() : "Reduce on NonGlobalAggregationCollectorManagerWithCollector called with non-empty collectors";
46+
4547
return super.reduce(List.of(collector));
4648
}
4749

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,6 @@ protected Aggregator createInternal(
8181
@Override
8282
protected boolean supportsConcurrentSegmentSearch() {
8383
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
84-
return false;
84+
return true;
8585
}
8686
}

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ final class CompositeAggregator extends BucketsAggregator {
107107

108108
private final CompositeValuesSourceConfig[] sourceConfigs;
109109
private final SingleDimensionValuesSource<?>[] sources;
110-
private final CompositeValuesCollectorQueue queue;
110+
private CompositeValuesCollectorQueue queue;
111111

112112
private final List<Entry> entries = new ArrayList<>();
113113
private LeafReaderContext currentLeaf;
@@ -236,6 +236,16 @@ protected void doPreCollection() throws IOException {
236236
@Override
237237
protected void doPostCollection() throws IOException {
238238
finishLeaf();
239+
// Re-create the ValuesSource on the search thread for concurrent search
240+
// for (int i = 0; i < sourceConfigs.length; i++) {
241+
// this.sources[i] = sourceConfigs[i].createValuesSource(
242+
// context.bigArrays(),
243+
// context.searcher().getIndexReader(),
244+
// size,
245+
// this::addRequestCircuitBreakerBytes
246+
// );
247+
// }
248+
// this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
239249
}
240250

241251
@Override

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
9999
protected int segmentsWithSingleValuedOrds = 0;
100100
protected int segmentsWithMultiValuedOrds = 0;
101101

102+
protected final IndexReader reader;
103+
102104
/**
103105
* Lookup global ordinals
104106
*
@@ -128,7 +130,9 @@ public GlobalOrdinalsStringTermsAggregator(
128130
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
129131
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
130132
this.valuesSource = valuesSource;
131-
final IndexReader reader = context.searcher().getIndexReader();
133+
reader = context.searcher().getIndexReader();
134+
// valuesSource is shared across aggregators and the DocValues here are created when the collector is created.
135+
// Need to delay this creation to when it's actually used in the index_search thread.
132136
final SortedSetDocValues values = reader.leaves().size() > 0
133137
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
134138
: DocValues.emptySortedSet();
@@ -885,7 +889,12 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
885889
}
886890

887891
StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
888-
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
892+
// BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
893+
SortedSetDocValues values = reader.leaves().size() > 0
894+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
895+
: DocValues.emptySortedSet();
896+
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));
897+
889898
StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
890899
result.bucketOrd = temp.bucketOrd;
891900
result.docCountError = 0;
@@ -1001,7 +1010,11 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
10011010
long subsetSize = subsetSize(owningBucketOrd);
10021011
return (spare, globalOrd, bucketOrd, docCount) -> {
10031012
spare.bucketOrd = bucketOrd;
1004-
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
1013+
// oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
1014+
SortedSetDocValues values = reader.leaves().size() > 0
1015+
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
1016+
: DocValues.emptySortedSet();
1017+
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
10051018
spare.subsetDf = docCount;
10061019
spare.subsetSize = subsetSize;
10071020
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);

test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,12 @@
211211
"LuceneFixedGap",
212212
"LuceneVarGapFixedInterval",
213213
"LuceneVarGapDocFreqInterval",
214-
"Lucene50" })
214+
"Lucene50",
215+
"Lucene90",
216+
"Lucene94",
217+
"Lucene90",
218+
"Lucene95",
219+
"Lucene99" })
215220
@LuceneTestCase.SuppressReproduceLine
216221
public abstract class OpenSearchTestCase extends LuceneTestCase {
217222

0 commit comments

Comments
 (0)