Skip to content

Commit 943eaca

Browse files
sarthakaggarwal97kaushalmahi12
authored andcommitted
Star Tree Implementation [OnHeap] (opensearch-project#14512)
--------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
1 parent 60e45e9 commit 943eaca

32 files changed

+3586
-37
lines changed

server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.lucene.index.MergeState;
1515
import org.apache.lucene.index.SegmentWriteState;
1616
import org.opensearch.common.annotation.ExperimentalApi;
17+
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
1718
import org.opensearch.index.mapper.CompositeMappedFieldType;
1819
import org.opensearch.index.mapper.MapperService;
1920
import org.opensearch.index.mapper.StarTreeMapper;
@@ -98,7 +99,9 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
9899
if (compositeFieldSet.isEmpty()) {
99100
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
100101
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
101-
// TODO : Call StarTree builder
102+
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService)) {
103+
starTreesBuilder.build();
104+
}
102105
}
103106
}
104107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.compositeindex.datacube.startree;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
13+
import java.util.Arrays;
14+
15+
/**
16+
* Star tree document
17+
*
18+
* @opensearch.experimental
19+
*/
20+
@ExperimentalApi
21+
public class StarTreeDocument {
22+
public final Long[] dimensions;
23+
public final Object[] metrics;
24+
25+
public StarTreeDocument(Long[] dimensions, Object[] metrics) {
26+
this.dimensions = dimensions;
27+
this.metrics = metrics;
28+
}
29+
30+
@Override
31+
public String toString() {
32+
return Arrays.toString(dimensions) + " | " + Arrays.toString(metrics);
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.index.compositeindex.datacube.startree.aggregators;
9+
10+
import org.opensearch.index.compositeindex.datacube.MetricStat;
11+
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
12+
13+
/**
14+
* Count value aggregator for star tree
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public class CountValueAggregator implements ValueAggregator<Long> {
19+
public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
20+
public static final long DEFAULT_INITIAL_VALUE = 1L;
21+
22+
@Override
23+
public MetricStat getAggregationType() {
24+
return MetricStat.COUNT;
25+
}
26+
27+
@Override
28+
public StarTreeNumericType getAggregatedValueType() {
29+
return VALUE_AGGREGATOR_TYPE;
30+
}
31+
32+
@Override
33+
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
34+
return DEFAULT_INITIAL_VALUE;
35+
}
36+
37+
@Override
38+
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
39+
return value + 1;
40+
}
41+
42+
@Override
43+
public Long mergeAggregatedValues(Long value, Long aggregatedValue) {
44+
return value + aggregatedValue;
45+
}
46+
47+
@Override
48+
public Long getInitialAggregatedValue(Long value) {
49+
return value;
50+
}
51+
52+
@Override
53+
public int getMaxAggregatedValueByteSize() {
54+
return Long.BYTES;
55+
}
56+
57+
@Override
58+
public Long toLongValue(Long value) {
59+
return value;
60+
}
61+
62+
@Override
63+
public Long toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
64+
return value;
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.index.compositeindex.datacube.startree.aggregators;
9+
10+
import org.opensearch.index.compositeindex.datacube.MetricStat;
11+
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
12+
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
13+
import org.opensearch.index.fielddata.IndexNumericFieldData;
14+
15+
import java.util.Comparator;
16+
import java.util.Objects;
17+
18+
/**
19+
* Builds aggregation function and doc values field pair to support various aggregations
20+
*
21+
* @opensearch.experimental
22+
*/
23+
public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {
24+
25+
public static final String DELIMITER = "_";
26+
private final String metric;
27+
private final String starFieldName;
28+
private final MetricStat metricStat;
29+
private final String field;
30+
private final ValueAggregator valueAggregators;
31+
private final StarTreeNumericType starTreeNumericType;
32+
private final SequentialDocValuesIterator metricStatReader;
33+
34+
/**
35+
* Constructor for MetricAggregatorInfo
36+
*/
37+
public MetricAggregatorInfo(
38+
MetricStat metricStat,
39+
String field,
40+
String starFieldName,
41+
IndexNumericFieldData.NumericType numericType,
42+
SequentialDocValuesIterator metricStatReader
43+
) {
44+
this.metricStat = metricStat;
45+
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat);
46+
this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType);
47+
this.metricStatReader = metricStatReader;
48+
this.field = field;
49+
this.starFieldName = starFieldName;
50+
this.metric = toFieldName();
51+
}
52+
53+
/**
54+
* @return metric type
55+
*/
56+
public MetricStat getMetricStat() {
57+
return metricStat;
58+
}
59+
60+
/**
61+
* @return field Name
62+
*/
63+
public String getField() {
64+
return field;
65+
}
66+
67+
/**
68+
* @return the metric stat name
69+
*/
70+
public String getMetric() {
71+
return metric;
72+
}
73+
74+
/**
75+
* @return aggregator for the field value
76+
*/
77+
public ValueAggregator getValueAggregators() {
78+
return valueAggregators;
79+
}
80+
81+
/**
82+
* @return star tree aggregated value type
83+
*/
84+
public StarTreeNumericType getAggregatedValueType() {
85+
return starTreeNumericType;
86+
}
87+
88+
/**
89+
* @return metric value reader iterator
90+
*/
91+
public SequentialDocValuesIterator getMetricStatReader() {
92+
return metricStatReader;
93+
}
94+
95+
/**
96+
* @return field name with metric type and field
97+
*/
98+
public String toFieldName() {
99+
return starFieldName + DELIMITER + field + DELIMITER + metricStat.getTypeName();
100+
}
101+
102+
@Override
103+
public int hashCode() {
104+
return Objects.hashCode(toFieldName());
105+
}
106+
107+
@Override
108+
public boolean equals(Object obj) {
109+
if (this == obj) {
110+
return true;
111+
}
112+
if (obj instanceof MetricAggregatorInfo) {
113+
MetricAggregatorInfo anotherPair = (MetricAggregatorInfo) obj;
114+
return metricStat == anotherPair.metricStat && field.equals(anotherPair.field);
115+
}
116+
return false;
117+
}
118+
119+
@Override
120+
public String toString() {
121+
return toFieldName();
122+
}
123+
124+
@Override
125+
public int compareTo(MetricAggregatorInfo other) {
126+
return Comparator.comparing((MetricAggregatorInfo o) -> o.field)
127+
.thenComparing((MetricAggregatorInfo o) -> o.metricStat)
128+
.compare(this, other);
129+
}
130+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.index.compositeindex.datacube.startree.aggregators;
9+
10+
import org.apache.lucene.util.NumericUtils;
11+
import org.opensearch.index.compositeindex.datacube.MetricStat;
12+
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
13+
import org.opensearch.search.aggregations.metrics.CompensatedSum;
14+
15+
/**
16+
* Sum value aggregator for star tree
17+
*
18+
* @opensearch.experimental
19+
*/
20+
public class SumValueAggregator implements ValueAggregator<Double> {
21+
22+
public static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.DOUBLE;
23+
private double sum = 0;
24+
private double compensation = 0;
25+
private CompensatedSum kahanSummation = new CompensatedSum(0, 0);
26+
27+
@Override
28+
public MetricStat getAggregationType() {
29+
return MetricStat.SUM;
30+
}
31+
32+
@Override
33+
public StarTreeNumericType getAggregatedValueType() {
34+
return VALUE_AGGREGATOR_TYPE;
35+
}
36+
37+
@Override
38+
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
39+
kahanSummation.reset(0, 0);
40+
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
41+
compensation = kahanSummation.delta();
42+
sum = kahanSummation.value();
43+
return kahanSummation.value();
44+
}
45+
46+
@Override
47+
public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocValue, StarTreeNumericType starTreeNumericType) {
48+
assert kahanSummation.value() == value;
49+
kahanSummation.reset(sum, compensation);
50+
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
51+
compensation = kahanSummation.delta();
52+
sum = kahanSummation.value();
53+
return kahanSummation.value();
54+
}
55+
56+
@Override
57+
public Double mergeAggregatedValues(Double value, Double aggregatedValue) {
58+
assert kahanSummation.value() == aggregatedValue;
59+
kahanSummation.reset(sum, compensation);
60+
kahanSummation.add(value);
61+
compensation = kahanSummation.delta();
62+
sum = kahanSummation.value();
63+
return kahanSummation.value();
64+
}
65+
66+
@Override
67+
public Double getInitialAggregatedValue(Double value) {
68+
kahanSummation.reset(0, 0);
69+
kahanSummation.add(value);
70+
compensation = kahanSummation.delta();
71+
sum = kahanSummation.value();
72+
return kahanSummation.value();
73+
}
74+
75+
@Override
76+
public int getMaxAggregatedValueByteSize() {
77+
return Double.BYTES;
78+
}
79+
80+
@Override
81+
public Long toLongValue(Double value) {
82+
try {
83+
return NumericUtils.doubleToSortableLong(value);
84+
} catch (Exception e) {
85+
throw new IllegalStateException("Cannot convert " + value + " to sortable long", e);
86+
}
87+
}
88+
89+
@Override
90+
public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
91+
try {
92+
return type.getDoubleValue(value);
93+
} catch (Exception e) {
94+
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)