Skip to content

Commit b3607a6

Browse files
sandeshkr419Mayank Sharma
authored and
Mayank Sharma
committed
[Star Tree] [Search] Keyword & Numeric Terms Aggregation (opensearch-project#17165)
--------- Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
1 parent f1286fa commit b3607a6

File tree

12 files changed

+986
-47
lines changed

12 files changed

+986
-47
lines changed

CHANGELOG-3.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
2323
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
2424
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
25+
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))
2526

2627
### Dependencies
2728
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))

server/src/main/java/org/opensearch/search/aggregations/bucket/BucketsAggregator.java

+6
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
136136
*/
137137
public final void collectStarTreeBucket(StarTreeBucketCollector collector, long docCount, long bucketOrd, int entryBit)
138138
throws IOException {
139+
if (bucketOrd < 0) {
140+
bucketOrd = -1 - bucketOrd;
141+
} else {
142+
grow(bucketOrd + 1);
143+
}
144+
139145
if (docCounts.increment(bucketOrd, docCount) == docCount) {
140146
multiBucketConsumer.accept(0);
141147
}

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

+8-42
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,14 @@
3333

3434
import org.apache.lucene.index.LeafReaderContext;
3535
import org.apache.lucene.index.SortedNumericDocValues;
36-
import org.apache.lucene.search.DocIdSetIterator;
3736
import org.apache.lucene.search.ScoreMode;
3837
import org.apache.lucene.util.CollectionUtil;
39-
import org.apache.lucene.util.FixedBitSet;
4038
import org.opensearch.common.Nullable;
4139
import org.opensearch.common.Rounding;
4240
import org.opensearch.common.lease.Releasables;
4341
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
4442
import org.opensearch.index.compositeindex.datacube.DateDimension;
45-
import org.opensearch.index.compositeindex.datacube.MetricStat;
4643
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
47-
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
4844
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
4945
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
5046
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
@@ -192,9 +188,9 @@ public ScoreMode scoreMode() {
192188
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
193189
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context.getQueryShardContext());
194190
if (supportedStarTree != null) {
195-
if (preComputeWithStarTree(ctx, supportedStarTree) == true) {
196-
return true;
197-
}
191+
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
192+
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
193+
return true;
198194
}
199195
return filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
200196
}
@@ -268,6 +264,10 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
268264
) throws IOException {
269265
assert parentCollector == null;
270266
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
267+
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
268+
.getDimensionValuesIterator(starTreeDateDimension);
269+
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);
270+
271271
return new StarTreeBucketCollector(
272272
starTreeValues,
273273
StarTreeTraversalUtil.getStarTreeResult(
@@ -287,17 +287,6 @@ public void setSubCollectors() throws IOException {
287287
}
288288
}
289289

290-
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
291-
.getDimensionValuesIterator(starTreeDateDimension);
292-
293-
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
294-
starTree.getField(),
295-
"_doc_count",
296-
MetricStat.DOC_COUNT.getTypeName()
297-
);
298-
SortedNumericStarTreeValuesIterator docCountsIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
299-
.getMetricValuesIterator(metricName);
300-
301290
@Override
302291
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
303292
if (!valuesIterator.advanceExact(starTreeEntry)) {
@@ -311,15 +300,8 @@ public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws
311300

312301
if (docCountsIterator.advanceExact(starTreeEntry)) {
313302
long metricValue = docCountsIterator.nextValue();
314-
315303
long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
316-
if (bucketOrd < 0) {
317-
bucketOrd = -1 - bucketOrd;
318-
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
319-
} else {
320-
grow(bucketOrd + 1);
321-
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
322-
}
304+
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
323305
}
324306
}
325307
}
@@ -393,20 +375,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
393375
return 1.0;
394376
}
395377
}
396-
397-
private boolean preComputeWithStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
398-
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, starTree, null);
399-
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();
400-
401-
int numBits = matchingDocsBitSet.length();
402-
403-
if (numBits > 0) {
404-
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
405-
? matchingDocsBitSet.nextSetBit(bit + 1)
406-
: DocIdSetIterator.NO_MORE_DOCS) {
407-
starTreeBucketCollector.collectStarTreeEntry(bit, 0);
408-
}
409-
}
410-
return true;
411-
}
412378
}

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

+78-3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
import org.opensearch.common.util.LongHash;
5252
import org.opensearch.core.common.io.stream.StreamOutput;
5353
import org.opensearch.core.xcontent.XContentBuilder;
54+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
55+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
56+
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
57+
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
5458
import org.opensearch.index.mapper.DocCountFieldMapper;
5559
import org.opensearch.search.DocValueFormat;
5660
import org.opensearch.search.aggregations.AggregationExecutionException;
@@ -63,14 +67,20 @@
6367
import org.opensearch.search.aggregations.InternalOrder;
6468
import org.opensearch.search.aggregations.LeafBucketCollector;
6569
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
70+
import org.opensearch.search.aggregations.StarTreeBucketCollector;
71+
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
6672
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
6773
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
6874
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
6975
import org.opensearch.search.aggregations.support.ValuesSource;
7076
import org.opensearch.search.internal.SearchContext;
77+
import org.opensearch.search.startree.StarTreeQueryHelper;
78+
import org.opensearch.search.startree.StarTreeTraversalUtil;
79+
import org.opensearch.search.startree.filter.DimensionFilter;
7180

7281
import java.io.IOException;
7382
import java.util.Arrays;
83+
import java.util.List;
7484
import java.util.Map;
7585
import java.util.function.BiConsumer;
7686
import java.util.function.Function;
@@ -85,18 +95,19 @@
8595
*
8696
* @opensearch.internal
8797
*/
88-
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
98+
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator implements StarTreePreComputeCollector {
8999
protected final ResultStrategy<?, ?, ?> resultStrategy;
90100
protected final ValuesSource.Bytes.WithOrdinals valuesSource;
91101

92102
private final LongPredicate acceptedGlobalOrdinals;
93103
private final long valueCount;
94-
private final String fieldName;
104+
protected final String fieldName;
95105
private Weight weight;
96106
protected final CollectionStrategy collectionStrategy;
97107
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
98108
protected int segmentsWithSingleValuedOrds = 0;
99109
protected int segmentsWithMultiValuedOrds = 0;
110+
LongUnaryOperator globalOperator;
100111

101112
/**
102113
* Lookup global ordinals
@@ -219,6 +230,9 @@ boolean tryCollectFromTermFrequencies(LeafReaderContext ctx, SortedSetDocValues
219230
@Override
220231
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
221232
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
233+
if (tryStarTreePrecompute(ctx) == true) {
234+
return true;
235+
}
222236
if (collectionStrategy instanceof DenseGlobalOrds
223237
&& this.resultStrategy instanceof StandardTermsResults
224238
&& subAggregators.length == 0) {
@@ -231,6 +245,17 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
231245
return false;
232246
}
233247

248+
protected boolean tryStarTreePrecompute(LeafReaderContext ctx) throws IOException {
249+
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
250+
if (supportedStarTree != null) {
251+
globalOperator = valuesSource.globalOrdinalsMapping(ctx);
252+
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
253+
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
254+
return true;
255+
}
256+
return false;
257+
}
258+
234259
@Override
235260
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
236261
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
@@ -307,6 +332,56 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
307332
});
308333
}
309334

335+
public StarTreeBucketCollector getStarTreeBucketCollector(
336+
LeafReaderContext ctx,
337+
CompositeIndexFieldInfo starTree,
338+
StarTreeBucketCollector parent
339+
) throws IOException {
340+
assert parent == null;
341+
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
342+
SortedSetStarTreeValuesIterator valuesIterator = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator(
343+
fieldName
344+
);
345+
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);
346+
347+
return new StarTreeBucketCollector(
348+
starTreeValues,
349+
StarTreeTraversalUtil.getStarTreeResult(
350+
starTreeValues,
351+
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
352+
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
353+
fieldName,
354+
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
355+
),
356+
context
357+
)
358+
) {
359+
@Override
360+
public void setSubCollectors() throws IOException {
361+
for (Aggregator aggregator : subAggregators) {
362+
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
363+
}
364+
}
365+
366+
@Override
367+
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
368+
if (valuesIterator.advanceExact(starTreeEntry) == false) {
369+
return;
370+
}
371+
for (int i = 0, count = valuesIterator.docValueCount(); i < count; i++) {
372+
long dimensionValue = valuesIterator.value();
373+
long ord = globalOperator.applyAsLong(dimensionValue);
374+
375+
if (docCountsIterator.advanceExact(starTreeEntry)) {
376+
long metricValue = docCountsIterator.nextValue();
377+
long bucketOrd = collectionStrategy.globalOrdToBucketOrd(0, ord);
378+
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
379+
}
380+
}
381+
}
382+
};
383+
}
384+
310385
@Override
311386
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
312387
return resultStrategy.buildAggregations(owningBucketOrds);
@@ -444,7 +519,7 @@ protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws
444519
(ord, docCount) -> incrementBucketDocCount(mapping.applyAsLong(ord), docCount)
445520
);
446521
}
447-
return false;
522+
return tryStarTreePrecompute(ctx);
448523
}
449524

450525
@Override

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

+81-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@
4141
import org.opensearch.common.lease.Releasable;
4242
import org.opensearch.common.lease.Releasables;
4343
import org.opensearch.common.util.LongArray;
44+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
45+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
46+
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
4447
import org.opensearch.index.fielddata.FieldData;
48+
import org.opensearch.index.mapper.NumberFieldMapper;
4549
import org.opensearch.search.DocValueFormat;
4650
import org.opensearch.search.aggregations.Aggregator;
4751
import org.opensearch.search.aggregations.AggregatorFactories;
@@ -52,6 +56,8 @@
5256
import org.opensearch.search.aggregations.InternalOrder;
5357
import org.opensearch.search.aggregations.LeafBucketCollector;
5458
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
59+
import org.opensearch.search.aggregations.StarTreeBucketCollector;
60+
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
5561
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
5662
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
5763
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
@@ -60,6 +66,9 @@
6066
import org.opensearch.search.aggregations.support.ValuesSource;
6167
import org.opensearch.search.internal.ContextIndexSearcher;
6268
import org.opensearch.search.internal.SearchContext;
69+
import org.opensearch.search.startree.StarTreeQueryHelper;
70+
import org.opensearch.search.startree.StarTreeTraversalUtil;
71+
import org.opensearch.search.startree.filter.DimensionFilter;
6372

6473
import java.io.IOException;
6574
import java.math.BigInteger;
@@ -79,11 +88,12 @@
7988
*
8089
* @opensearch.internal
8190
*/
82-
public class NumericTermsAggregator extends TermsAggregator {
91+
public class NumericTermsAggregator extends TermsAggregator implements StarTreePreComputeCollector {
8392
private final ResultStrategy<?, ?> resultStrategy;
8493
private final ValuesSource.Numeric valuesSource;
8594
private final LongKeyedBucketOrds bucketOrds;
8695
private final LongFilter longFilter;
96+
private final String fieldName;
8797

8898
public NumericTermsAggregator(
8999
String name,
@@ -105,6 +115,9 @@ public NumericTermsAggregator(
105115
this.valuesSource = valuesSource;
106116
this.longFilter = longFilter;
107117
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
118+
this.fieldName = (this.valuesSource instanceof ValuesSource.Numeric.FieldData)
119+
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
120+
: null;
108121
}
109122

110123
@Override
@@ -146,6 +159,73 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
146159
});
147160
}
148161

162+
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
163+
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
164+
if (supportedStarTree != null) {
165+
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
166+
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
167+
return true;
168+
}
169+
return false;
170+
}
171+
172+
public StarTreeBucketCollector getStarTreeBucketCollector(
173+
LeafReaderContext ctx,
174+
CompositeIndexFieldInfo starTree,
175+
StarTreeBucketCollector parent
176+
) throws IOException {
177+
assert parent == null;
178+
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
179+
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
180+
.getDimensionValuesIterator(fieldName);
181+
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);
182+
183+
return new StarTreeBucketCollector(
184+
starTreeValues,
185+
StarTreeTraversalUtil.getStarTreeResult(
186+
starTreeValues,
187+
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
188+
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
189+
fieldName,
190+
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
191+
),
192+
context
193+
)
194+
) {
195+
@Override
196+
public void setSubCollectors() throws IOException {
197+
for (Aggregator aggregator : subAggregators) {
198+
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
199+
}
200+
}
201+
202+
@Override
203+
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
204+
if (valuesIterator.advanceExact(starTreeEntry) == false) {
205+
return;
206+
}
207+
long dimensionValue = valuesIterator.nextValue();
208+
// Only numeric & floating points are supported as of now in star-tree
209+
// TODO: Add support for isBigInteger() when it gets supported in star-tree
210+
if (valuesSource.isFloatingPoint()) {
211+
double doubleValue = ((NumberFieldMapper.NumberFieldType) context.mapperService().fieldType(fieldName)).toDoubleValue(
212+
dimensionValue
213+
);
214+
dimensionValue = NumericUtils.doubleToSortableLong(doubleValue);
215+
}
216+
217+
for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {
218+
219+
if (docCountsIterator.advanceExact(starTreeEntry)) {
220+
long metricValue = docCountsIterator.nextValue();
221+
long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
222+
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
223+
}
224+
}
225+
}
226+
};
227+
}
228+
149229
@Override
150230
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
151231
return resultStrategy.buildAggregations(owningBucketOrds);

0 commit comments

Comments
 (0)