Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Star Tree] [Search] Keyword & Numeric Terms Aggregation #17165

Merged
merged 11 commits into from
Mar 4, 2025
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))
- Add filter function for AbstractQueryBuilder, BoolQueryBuilder, ConstantScoreQueryBuilder([#17409](https://github.com/opensearch-project/OpenSearch/pull/17409))
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
*/
public final void collectStarTreeBucket(StarTreeBucketCollector collector, long docCount, long bucketOrd, int entryBit)
throws IOException {
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
} else {
grow(bucketOrd + 1);
}

if (docCounts.increment(bucketOrd, docCount) == docCount) {
multiBucketConsumer.accept(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,14 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
Expand Down Expand Up @@ -192,9 +188,9 @@ public ScoreMode scoreMode() {
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
if (preComputeWithStarTree(ctx, supportedStarTree) == true) {
return true;
}
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
return true;
}
return filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
}
Expand Down Expand Up @@ -268,6 +264,10 @@ public StarTreeBucketCollector getStarTreeBucketCollector(
) throws IOException {
assert parentCollector == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(starTreeDateDimension);
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);

return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
Expand All @@ -287,17 +287,6 @@ public void setSubCollectors() throws IOException {
}
}

SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(starTreeDateDimension);

String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
"_doc_count",
MetricStat.DOC_COUNT.getTypeName()
);
SortedNumericStarTreeValuesIterator docCountsIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (!valuesIterator.advanceExact(starTreeEntry)) {
Expand All @@ -311,15 +300,8 @@ public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();

long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
Expand Down Expand Up @@ -393,20 +375,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
return 1.0;
}
}

private boolean preComputeWithStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo starTree) throws IOException {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, starTree, null);
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();

int numBits = matchingDocsBitSet.length();

if (numBits > 0) {
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? matchingDocsBitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {
starTreeBucketCollector.collectStarTreeEntry(bit, 0);
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
Expand All @@ -63,14 +67,20 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.StarTreeTraversalUtil;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand All @@ -85,18 +95,19 @@
*
* @opensearch.internal
*/
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator implements StarTreePreComputeCollector {
protected final ResultStrategy<?, ?, ?> resultStrategy;
protected final ValuesSource.Bytes.WithOrdinals valuesSource;

private final LongPredicate acceptedGlobalOrdinals;
private final long valueCount;
private final String fieldName;
protected final String fieldName;
private Weight weight;
protected final CollectionStrategy collectionStrategy;
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
LongUnaryOperator globalOperator;

/**
* Lookup global ordinals
Expand Down Expand Up @@ -219,6 +230,9 @@
@Override
protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
if (tryStarTreePrecompute(ctx) == true) {
return true;
}
if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
&& subAggregators.length == 0) {
Expand All @@ -231,6 +245,17 @@
return false;
}

protected boolean tryStarTreePrecompute(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
globalOperator = valuesSource.globalOrdinalsMapping(ctx);
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
return true;
}
return false;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
Expand Down Expand Up @@ -307,6 +332,56 @@
});
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
SortedSetStarTreeValuesIterator valuesIterator = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator(
fieldName
);
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);

return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
starTreeValues,
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
fieldName,
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
),
context
)
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (valuesIterator.advanceExact(starTreeEntry) == false) {
return;

Check warning on line 369 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java#L369

Added line #L369 was not covered by tests
}
for (int i = 0, count = valuesIterator.docValueCount(); i < count; i++) {
long dimensionValue = valuesIterator.value();
long ord = globalOperator.applyAsLong(dimensionValue);

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();
long bucketOrd = collectionStrategy.globalOrdToBucketOrd(0, ord);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
};
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down Expand Up @@ -444,7 +519,7 @@
(ord, docCount) -> incrementBucketDocCount(mapping.applyAsLong(ord), docCount)
);
}
return false;
return tryStarTreePrecompute(ctx);

Check warning on line 522 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java#L522

Added line #L522 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.LongArray;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.fielddata.FieldData;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -52,6 +56,8 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
Expand All @@ -60,6 +66,9 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.StarTreeTraversalUtil;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.math.BigInteger;
Expand All @@ -79,11 +88,12 @@
*
* @opensearch.internal
*/
public class NumericTermsAggregator extends TermsAggregator {
public class NumericTermsAggregator extends TermsAggregator implements StarTreePreComputeCollector {
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource.Numeric valuesSource;
private final LongKeyedBucketOrds bucketOrds;
private final LongFilter longFilter;
private final String fieldName;

public NumericTermsAggregator(
String name,
Expand All @@ -105,6 +115,9 @@
this.valuesSource = valuesSource;
this.longFilter = longFilter;
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
this.fieldName = (this.valuesSource instanceof ValuesSource.Numeric.FieldData)
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
: null;
}

@Override
Expand Down Expand Up @@ -146,6 +159,73 @@
});
}

protected boolean tryPrecomputeAggregationForLeaf(LeafReaderContext ctx) throws IOException {
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
StarTreeQueryHelper.preComputeBucketsWithStarTree(starTreeBucketCollector);
return true;
}
return false;
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(fieldName);
SortedNumericStarTreeValuesIterator docCountsIterator = StarTreeQueryHelper.getDocCountsIterator(starTreeValues, starTree);

return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
starTreeValues,
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
fieldName,
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
),
context
)
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (valuesIterator.advanceExact(starTreeEntry) == false) {
return;

Check warning on line 205 in server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java#L205

Added line #L205 was not covered by tests
}
long dimensionValue = valuesIterator.nextValue();
// Only numeric & floating points are supported as of now in star-tree
// TODO: Add support for isBigInteger() when it gets supported in star-tree
if (valuesSource.isFloatingPoint()) {
double doubleValue = ((NumberFieldMapper.NumberFieldType) context.mapperService().fieldType(fieldName)).toDoubleValue(
dimensionValue
);
dimensionValue = NumericUtils.doubleToSortableLong(doubleValue);
}

for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();
long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
};
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down
Loading
Loading