Skip to content

Commit b5234a5

Browse files
sandeshkr419Sandesh Kumar
and
Sandesh Kumar
authored
[Star Tree] [Search] Resolve Date histogram with metric aggregation using star-tree (#16674)
--------- Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com> Co-authored-by: Sandesh Kumar <kusandes@amazon.com>
1 parent e6fc600 commit b5234a5

19 files changed

+1144
-78
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3838
- Added new Setting property UnmodifiableOnRestore to prevent updating settings on restore snapshot ([#16957](https://github.com/opensearch-project/OpenSearch/pull/16957))
3939
- Introduce Template query ([#16818](https://github.com/opensearch-project/OpenSearch/pull/16818))
4040
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
41+
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))
4142

4243
### Dependencies
4344
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))

server/src/main/java/org/opensearch/common/Rounding.java

+9
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ public void writeTo(StreamOutput out) throws IOException {
270270

271271
public abstract byte id();
272272

273+
public DateTimeUnit unit() {
274+
return null;
275+
}
276+
273277
/**
274278
* A strategy for rounding milliseconds since epoch.
275279
*
@@ -517,6 +521,11 @@ public byte id() {
517521
return ID;
518522
}
519523

524+
@Override
525+
public DateTimeUnit unit() {
526+
return unit;
527+
}
528+
520529
private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) {
521530
switch (unit) {
522531
case SECOND_OF_MINUTE:

server/src/main/java/org/opensearch/index/compositeindex/datacube/DateDimension.java

+18
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,24 @@ public int compare(DateTimeUnitRounding unit1, DateTimeUnitRounding unit2) {
169169
}
170170
}
171171

172+
/**
173+
* Returns the closest valid calendar interval to be used for the search interval
174+
*/
175+
public DateTimeUnitRounding findClosestValidInterval(DateTimeUnitRounding searchInterval) {
176+
DateTimeUnitComparator comparator = new DateTimeUnitComparator();
177+
DateTimeUnitRounding closestValidInterval = null;
178+
179+
// Find the largest interval that is less than or equal to search interval
180+
for (DateTimeUnitRounding interval : sortedCalendarIntervals) {
181+
if (comparator.compare(interval, searchInterval) <= 0) {
182+
closestValidInterval = interval;
183+
} else {
184+
break;
185+
}
186+
}
187+
return closestValidInterval;
188+
}
189+
172190
/**
173191
* Returns a sorted list of dateTimeUnits based on the DateTimeUnitComparator
174192
*/

server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeQueryHelper.java

+105-47
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717
import org.opensearch.common.lucene.Lucene;
1818
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
1919
import org.opensearch.index.codec.composite.CompositeIndexReader;
20+
import org.opensearch.index.compositeindex.datacube.DateDimension;
2021
import org.opensearch.index.compositeindex.datacube.Dimension;
2122
import org.opensearch.index.compositeindex.datacube.Metric;
2223
import org.opensearch.index.compositeindex.datacube.MetricStat;
2324
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
25+
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitAdapter;
26+
import org.opensearch.index.compositeindex.datacube.startree.utils.date.DateTimeUnitRounding;
2427
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
2528
import org.opensearch.index.mapper.CompositeDataCubeFieldType;
2629
import org.opensearch.index.query.MatchAllQueryBuilder;
2730
import org.opensearch.index.query.QueryBuilder;
2831
import org.opensearch.index.query.TermQueryBuilder;
2932
import org.opensearch.search.aggregations.AggregatorFactory;
3033
import org.opensearch.search.aggregations.LeafBucketCollector;
31-
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
34+
import org.opensearch.search.aggregations.StarTreeBucketCollector;
35+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
3236
import org.opensearch.search.aggregations.metrics.MetricAggregatorFactory;
3337
import org.opensearch.search.aggregations.support.ValuesSource;
3438
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -37,9 +41,10 @@
3741
import org.opensearch.search.startree.StarTreeQueryContext;
3842

3943
import java.io.IOException;
40-
import java.util.HashMap;
4144
import java.util.List;
4245
import java.util.Map;
46+
import java.util.Set;
47+
import java.util.function.BiConsumer;
4348
import java.util.function.Consumer;
4449
import java.util.stream.Collectors;
4550

@@ -74,10 +79,16 @@ public static StarTreeQueryContext getStarTreeQueryContext(SearchContext context
7479
);
7580

7681
for (AggregatorFactory aggregatorFactory : context.aggregations().factories().getFactories()) {
77-
MetricStat metricStat = validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory);
78-
if (metricStat == null) {
79-
return null;
82+
// first check for aggregation is a metric aggregation
83+
if (validateStarTreeMetricSupport(compositeMappedFieldType, aggregatorFactory)) {
84+
continue;
85+
}
86+
87+
// if not a metric aggregation, check for applicable date histogram shape
88+
if (validateDateHistogramSupport(compositeMappedFieldType, aggregatorFactory)) {
89+
continue;
8090
}
91+
return null;
8192
}
8293

8394
// need to cache star tree values only for multiple aggregations
@@ -99,64 +110,85 @@ private static StarTreeQueryContext tryCreateStarTreeQueryContext(
99110
Map<String, Long> queryMap;
100111
if (queryBuilder == null || queryBuilder instanceof MatchAllQueryBuilder) {
101112
queryMap = null;
102-
} else if (queryBuilder instanceof TermQueryBuilder) {
113+
} else if (queryBuilder instanceof TermQueryBuilder termQueryBuilder) {
103114
// TODO: Add support for keyword fields
104-
if (compositeFieldType.getDimensions().stream().anyMatch(d -> d.getDocValuesType() != DocValuesType.SORTED_NUMERIC)) {
105-
// return null for non-numeric fields
106-
return null;
107-
}
108-
109-
List<String> supportedDimensions = compositeFieldType.getDimensions()
115+
Dimension matchedDimension = compositeFieldType.getDimensions()
110116
.stream()
111-
.map(Dimension::getField)
112-
.collect(Collectors.toList());
113-
queryMap = getStarTreePredicates(queryBuilder, supportedDimensions);
114-
if (queryMap == null) {
117+
.filter(d -> (d.getField().equals(termQueryBuilder.fieldName()) && d.getDocValuesType() == DocValuesType.SORTED_NUMERIC))
118+
.findFirst()
119+
.orElse(null);
120+
if (matchedDimension == null) {
115121
return null;
116122
}
123+
queryMap = Map.of(termQueryBuilder.fieldName(), Long.parseLong(termQueryBuilder.value().toString()));
117124
} else {
118125
return null;
119126
}
120127
return new StarTreeQueryContext(compositeIndexFieldInfo, queryMap, cacheStarTreeValuesSize);
121128
}
122129

123-
/**
124-
* Parse query body to star-tree predicates
125-
* @param queryBuilder to match star-tree supported query shape
126-
* @return predicates to match
127-
*/
128-
private static Map<String, Long> getStarTreePredicates(QueryBuilder queryBuilder, List<String> supportedDimensions) {
129-
TermQueryBuilder tq = (TermQueryBuilder) queryBuilder;
130-
String field = tq.fieldName();
131-
if (!supportedDimensions.contains(field)) {
132-
return null;
133-
}
134-
long inputQueryVal = Long.parseLong(tq.value().toString());
135-
136-
// Create a map with the field and the value
137-
Map<String, Long> predicateMap = new HashMap<>();
138-
predicateMap.put(field, inputQueryVal);
139-
return predicateMap;
140-
}
141-
142-
private static MetricStat validateStarTreeMetricSupport(
130+
private static boolean validateStarTreeMetricSupport(
143131
CompositeDataCubeFieldType compositeIndexFieldInfo,
144132
AggregatorFactory aggregatorFactory
145133
) {
146-
if (aggregatorFactory instanceof MetricAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 0) {
134+
if (aggregatorFactory instanceof MetricAggregatorFactory metricAggregatorFactory
135+
&& metricAggregatorFactory.getSubFactories().getFactories().length == 0) {
147136
String field;
148137
Map<String, List<MetricStat>> supportedMetrics = compositeIndexFieldInfo.getMetrics()
149138
.stream()
150139
.collect(Collectors.toMap(Metric::getField, Metric::getMetrics));
151140

152-
MetricStat metricStat = ((MetricAggregatorFactory) aggregatorFactory).getMetricStat();
153-
field = ((MetricAggregatorFactory) aggregatorFactory).getField();
141+
MetricStat metricStat = metricAggregatorFactory.getMetricStat();
142+
field = metricAggregatorFactory.getField();
143+
144+
return supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat);
145+
}
146+
return false;
147+
}
148+
149+
private static boolean validateDateHistogramSupport(
150+
CompositeDataCubeFieldType compositeIndexFieldInfo,
151+
AggregatorFactory aggregatorFactory
152+
) {
153+
if (!(aggregatorFactory instanceof DateHistogramAggregatorFactory dateHistogramAggregatorFactory)
154+
|| aggregatorFactory.getSubFactories().getFactories().length < 1) {
155+
return false;
156+
}
157+
158+
// Find the DateDimension in the dimensions list
159+
DateDimension starTreeDateDimension = null;
160+
for (Dimension dimension : compositeIndexFieldInfo.getDimensions()) {
161+
if (dimension instanceof DateDimension) {
162+
starTreeDateDimension = (DateDimension) dimension;
163+
break;
164+
}
165+
}
166+
167+
// If no DateDimension is found, validation fails
168+
if (starTreeDateDimension == null) {
169+
return false;
170+
}
171+
172+
// Ensure the rounding is not null
173+
if (dateHistogramAggregatorFactory.getRounding() == null) {
174+
return false;
175+
}
176+
177+
// Find the closest valid interval in the DateTimeUnitRounding class associated with star tree
178+
DateTimeUnitRounding rounding = starTreeDateDimension.findClosestValidInterval(
179+
new DateTimeUnitAdapter(dateHistogramAggregatorFactory.getRounding())
180+
);
181+
if (rounding == null) {
182+
return false;
183+
}
154184

155-
if (field != null && supportedMetrics.containsKey(field) && supportedMetrics.get(field).contains(metricStat)) {
156-
return metricStat;
185+
// Validate all sub-factories
186+
for (AggregatorFactory subFactory : aggregatorFactory.getSubFactories().getFactories()) {
187+
if (!validateStarTreeMetricSupport(compositeIndexFieldInfo, subFactory)) {
188+
return false;
157189
}
158190
}
159-
return null;
191+
return true;
160192
}
161193

162194
public static CompositeIndexFieldInfo getSupportedStarTree(SearchContext context) {
@@ -222,11 +254,37 @@ public static LeafBucketCollector getStarTreeLeafCollector(
222254
// Call the final consumer after processing all entries
223255
finalConsumer.run();
224256

225-
// Return a LeafBucketCollector that terminates collection
226-
return new LeafBucketCollectorBase(sub, valuesSource.doubleValues(ctx)) {
257+
// Terminate after pre-computing aggregation
258+
throw new CollectionTerminatedException();
259+
}
260+
261+
public static StarTreeBucketCollector getStarTreeBucketMetricCollector(
262+
CompositeIndexFieldInfo starTree,
263+
String metric,
264+
ValuesSource.Numeric valuesSource,
265+
StarTreeBucketCollector parentCollector,
266+
Consumer<Long> growArrays,
267+
BiConsumer<Long, Long> updateBucket
268+
) throws IOException {
269+
assert parentCollector != null;
270+
return new StarTreeBucketCollector(parentCollector) {
271+
String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
272+
starTree.getField(),
273+
((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName(),
274+
metric
275+
);
276+
SortedNumericStarTreeValuesIterator metricValuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
277+
.getMetricValuesIterator(metricName);
278+
227279
@Override
228-
public void collect(int doc, long bucket) {
229-
throw new CollectionTerminatedException();
280+
public void collectStarTreeEntry(int starTreeEntryBit, long bucket) throws IOException {
281+
growArrays.accept(bucket);
282+
// Advance the valuesIterator to the current bit
283+
if (!metricValuesIterator.advanceExact(starTreeEntryBit)) {
284+
return; // Skip if no entries for this document
285+
}
286+
long metricValue = metricValuesIterator.nextValue();
287+
updateBucket.accept(bucket, metricValue);
230288
}
231289
};
232290
}
@@ -240,7 +298,7 @@ public static FixedBitSet getStarTreeFilteredValues(SearchContext context, LeafR
240298
throws IOException {
241299
FixedBitSet result = context.getStarTreeQueryContext().getStarTreeValues(ctx);
242300
if (result == null) {
243-
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap());
301+
result = StarTreeFilter.getStarTreeResult(starTreeValues, context.getStarTreeQueryContext().getQueryMap(), Set.of());
244302
context.getStarTreeQueryContext().setStarTreeValues(ctx, result);
245303
}
246304
return result;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations;
10+
11+
import org.apache.lucene.util.FixedBitSet;
12+
import org.opensearch.common.annotation.ExperimentalApi;
13+
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
14+
15+
import java.io.IOException;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
/**
20+
* Collector for star tree aggregation
21+
* This abstract class exposes utilities to help avoid traversing star-tree multiple times and
22+
* collect relevant metrics across nested aggregations in a single traversal
23+
* @opensearch.internal
24+
*/
25+
@ExperimentalApi
26+
public abstract class StarTreeBucketCollector {
27+
28+
protected final StarTreeValues starTreeValues;
29+
protected final FixedBitSet matchingDocsBitSet;
30+
protected final List<StarTreeBucketCollector> subCollectors = new ArrayList<>();
31+
32+
public StarTreeBucketCollector(StarTreeValues starTreeValues, FixedBitSet matchingDocsBitSet) throws IOException {
33+
this.starTreeValues = starTreeValues;
34+
this.matchingDocsBitSet = matchingDocsBitSet;
35+
this.setSubCollectors();
36+
}
37+
38+
public StarTreeBucketCollector(StarTreeBucketCollector parent) throws IOException {
39+
this.starTreeValues = parent.getStarTreeValues();
40+
this.matchingDocsBitSet = parent.getMatchingDocsBitSet();
41+
this.setSubCollectors();
42+
}
43+
44+
/**
45+
* Sets the sub-collectors to track nested aggregators
46+
*/
47+
public void setSubCollectors() throws IOException {};
48+
49+
/**
50+
* Returns a list of sub-collectors to track nested aggregators
51+
*/
52+
public List<StarTreeBucketCollector> getSubCollectors() {
53+
return subCollectors;
54+
}
55+
56+
/**
57+
* Returns the tree values to iterate
58+
*/
59+
public StarTreeValues getStarTreeValues() {
60+
return starTreeValues;
61+
}
62+
63+
/**
64+
* Returns the matching docs bitset to iterate upon the star-tree values based on search query
65+
*/
66+
public FixedBitSet getMatchingDocsBitSet() {
67+
return matchingDocsBitSet;
68+
}
69+
70+
/**
71+
* Collects the star tree entry and bucket ordinal to update
72+
* The method implementation should identify the metrics to collect from that star-tree entry to the specified bucket
73+
*/
74+
public abstract void collectStarTreeEntry(int starTreeEntry, long bucket) throws IOException;
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* This interface is used to pre-compute the star tree bucket collector for each segment/leaf.
18+
* It is utilized by parent aggregation to retrieve a StarTreeBucketCollector which can be used to
19+
* pre-compute the associated aggregation along with its parent pre-computation using star-tree
20+
*
21+
* @opensearch.internal
22+
*/
23+
public interface StarTreePreComputeCollector {
24+
/**
25+
* Get the star tree bucket collector for the specified segment/leaf
26+
*/
27+
StarTreeBucketCollector getStarTreeBucketCollector(
28+
LeafReaderContext ctx,
29+
CompositeIndexFieldInfo starTree,
30+
StarTreeBucketCollector parentCollector
31+
) throws IOException;
32+
}

0 commit comments

Comments
 (0)