Skip to content

Commit 75ad9a8

Browse files
committed
Introduce interefaces for nested aggregations
Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
1 parent ff81173 commit 75ad9a8

File tree

6 files changed

+186
-133
lines changed

6 files changed

+186
-133
lines changed

server/src/main/java/org/opensearch/search/aggregations/StarTreeBucketCollector.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,26 @@
88

99
package org.opensearch.search.aggregations;
1010

11+
import org.apache.lucene.util.FixedBitSet;
12+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
13+
1114
import java.io.IOException;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
18+
public abstract class StarTreeBucketCollector {
19+
20+
protected StarTreeValues starTreeValues;
21+
protected FixedBitSet matchingDocsBitSet;
22+
protected final List<StarTreeBucketCollector> subCollectors = new ArrayList<>();
23+
24+
public List<StarTreeBucketCollector> getSubCollectors() {
25+
return subCollectors;
26+
}
1227

13-
public abstract class StarTreeBucketCollector extends LeafBucketCollector {
28+
public FixedBitSet getMatchingDocsBitSet() {
29+
return matchingDocsBitSet;
30+
}
1431

1532
public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException;
1633
}

server/src/main/java/org/opensearch/search/aggregations/StarTreeLeafBucketCollectorBase.java

-59
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
13+
14+
import java.io.IOException;
15+
16+
public interface StarTreePreComputeCollector {
17+
StarTreeBucketCollector getStarTreeBucketCollector(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException;
18+
}

server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,15 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
130130
subCollector.collect(doc, bucketOrd);
131131
}
132132

133-
public final void collectStarTreeBucket(StarTreeBucketCollector subCollector, long docCount, long bucketOrd, int entryBit)
133+
public final void collectStarTreeBucket(StarTreeBucketCollector collector, long docCount, long bucketOrd, int entryBit)
134134
throws IOException {
135135
if (docCounts.increment(bucketOrd, docCount) == docCount) {
136136
multiBucketConsumer.accept(0);
137137
}
138-
subCollector.collectStarEntry(entryBit, bucketOrd);
138+
for (StarTreeBucketCollector subCollector : collector.getSubCollectors()) {
139+
subCollector.collectStarEntry(entryBit, bucketOrd);
140+
}
141+
139142
}
140143

141144
/**

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

+71-42
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.opensearch.common.Rounding;
4343
import org.opensearch.common.lease.Releasables;
4444
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
45-
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
4645
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
4746
import org.opensearch.search.DocValueFormat;
4847
import org.opensearch.search.aggregations.Aggregator;
@@ -53,6 +52,7 @@
5352
import org.opensearch.search.aggregations.LeafBucketCollector;
5453
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
5554
import org.opensearch.search.aggregations.StarTreeBucketCollector;
55+
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
5656
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
5757
import org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge;
5858
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
@@ -80,7 +80,7 @@
8080
*
8181
* @opensearch.internal
8282
*/
83-
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator {
83+
class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAggregator, StarTreePreComputeCollector {
8484
private final ValuesSource.Numeric valuesSource;
8585
private final DocValueFormat formatter;
8686
private final Rounding rounding;
@@ -182,47 +182,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
182182
SortedNumericDocValues values = valuesSource.longValues(ctx);
183183
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context);
184184
if (supportedStarTree != null) {
185-
StarTreeValues starTreeValues = getStarTreeValues(ctx, supportedStarTree);
186-
assert starTreeValues != null;
187-
188-
FixedBitSet matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month");
189-
190-
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
191-
.getDimensionValuesIterator("@timestamp_month");
192-
193-
SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
194-
.getMetricValuesIterator("startree1__doc_count_doc_count_metric");
195-
196-
int numBits = matchingDocsBitSet.length();
197-
198-
if (numBits > 0) {
199-
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
200-
? matchingDocsBitSet.nextSetBit(bit + 1)
201-
: DocIdSetIterator.NO_MORE_DOCS) {
202-
203-
if (!valuesIterator.advanceExact(bit)) {
204-
continue;
205-
}
206-
207-
for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
208-
long dimensionValue = valuesIterator.nextValue();
209-
210-
if (metricValuesIterator.advanceExact(bit)) {
211-
long metricValue = metricValuesIterator.nextValue();
212-
213-
long bucketOrd = bucketOrds.add(0, dimensionValue);
214-
if (bucketOrd < 0) {
215-
bucketOrd = -1 - bucketOrd;
216-
collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit);
217-
} else {
218-
grow(bucketOrd + 1);
219-
collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit);
220-
}
221-
}
222-
}
223-
}
185+
if (preCompute(ctx, supportedStarTree) == true) {
186+
return LeafBucketCollector.NO_OP_COLLECTOR;
224187
}
225-
throw new CollectionTerminatedException();
226188
}
227189

228190
return new LeafBucketCollectorBase(sub, values) {
@@ -255,6 +217,55 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
255217
};
256218
}
257219

220+
@Override
221+
public StarTreeBucketCollector getStarTreeBucketCollector(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
222+
223+
return new StarTreeBucketCollector() {
224+
225+
public void setSubCollectors() throws IOException {
226+
for (Aggregator aggregator : subAggregators) {
227+
((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree);
228+
}
229+
}
230+
231+
{
232+
this.starTreeValues = getStarTreeValues(ctx, starTree);
233+
this.matchingDocsBitSet = StarTreeFilter.getPredicateValueToFixedBitSetMap(starTreeValues, "@timestamp_month");
234+
this.setSubCollectors();
235+
}
236+
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
237+
.getDimensionValuesIterator("@timestamp_month");
238+
239+
SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
240+
.getMetricValuesIterator("startree1__doc_count_doc_count_metric");
241+
242+
@Override
243+
public void collectStarEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
244+
245+
if (!valuesIterator.advanceExact(starTreeEntry)) {
246+
return;
247+
}
248+
249+
for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
250+
long dimensionValue = valuesIterator.nextValue();
251+
252+
if (metricValuesIterator.advanceExact(starTreeEntry)) {
253+
long metricValue = metricValuesIterator.nextValue();
254+
255+
long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
256+
if (bucketOrd < 0) {
257+
bucketOrd = -1 - bucketOrd;
258+
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
259+
} else {
260+
grow(bucketOrd + 1);
261+
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
262+
}
263+
}
264+
}
265+
}
266+
};
267+
}
268+
258269
@Override
259270
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
260271
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
@@ -322,4 +333,22 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
322333
return 1.0;
323334
}
324335
}
336+
337+
public boolean preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
338+
// TODO: validate query shape - retrun false if cannot be resolved via star-tree
339+
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, starTree);
340+
341+
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();
342+
343+
int numBits = matchingDocsBitSet.length();
344+
345+
if (numBits > 0) {
346+
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
347+
? matchingDocsBitSet.nextSetBit(bit + 1)
348+
: DocIdSetIterator.NO_MORE_DOCS) {
349+
starTreeBucketCollector.collectStarEntry(bit, 0);
350+
}
351+
}
352+
return true;
353+
}
325354
}

0 commit comments

Comments
 (0)