Skip to content

Commit de59264

Browse files
Fix auto date histogram rounding assertion bug (#17023)
* Add comments explanations for auto date histo increaseRoundingIfNeeded. Signed-off-by: Finn Carroll <carrofin@amazon.com> * Add testFilterRewriteWithTZRoundingRangeAssert() to reproduce auto date histo assertion bug per #16932 Signed-off-by: Finn Carroll <carrofin@amazon.com> * Fix #16932. Ensure optimized path can only increase preparedRounding of agg. Signed-off-by: Finn Carroll <carrofin@amazon.com> * Spotless apply Signed-off-by: Finn Carroll <carrofin@amazon.com> * Fast fail filter rewrite opt in data histo aggs for non UTC timezones Signed-off-by: Finn Carroll <carrofin@amazon.com> * Remove redundant UTC check from getInterval(). Signed-off-by: Finn Carroll <carrofin@amazon.com> * Save a call to prepareRounding if roundingIdx is unchanged. Signed-off-by: Finn Carroll <carrofin@amazon.com> * Spotless apply Signed-off-by: Finn Carroll <carrofin@amazon.com> * Changelog Signed-off-by: Finn Carroll <carrofin@amazon.com> * Add ZoneId getter for date histo filter rewrite canOptimize check. Signed-off-by: Finn Carroll <carrofin@amazon.com> * Spotless apply Signed-off-by: Finn Carroll <carrofin@amazon.com> * Disable ff optimzation for composite agg in canOptimize. Signed-off-by: Finn Carroll <carrofin@amazon.com> * Spotless apply Signed-off-by: Finn Carroll <carrofin@amazon.com> * Handle utc timezone check Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Remove redundant timeZone getter. Signed-off-by: Finn Carroll <carrofin@amazon.com> * Simplify ff prepared rounding check. Signed-off-by: Finn Carroll <carrofin@amazon.com> --------- Signed-off-by: Finn Carroll <carrofin@amazon.com> Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 32a88eb commit de59264

File tree

9 files changed

+157
-22
lines changed

9 files changed

+157
-22
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
117117
- Fix GRPC AUX_TRANSPORT_PORT and SETTING_GRPC_PORT settings and remove lingering HTTP terminology ([#17037](https://github.com/opensearch-project/OpenSearch/pull/17037))
118118
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
119119
- Use OpenSearch version to deserialize remote custom metadata([#16494](https://github.com/opensearch-project/OpenSearch/pull/16494))
120+
- Fix AutoDateHistogramAggregator rounding assertion failure ([#17023](https://github.com/opensearch-project/OpenSearch/pull/17023))
120121

121122
### Security
122123

server/src/main/java/org/opensearch/common/Rounding.java

+16-11
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,11 @@ public String toString() {
668668
return "Rounding[" + unit + " in " + timeZone + "]";
669669
}
670670

671+
@Override
672+
public boolean isUTC() {
673+
return "Z".equals(timeZone.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
674+
}
675+
671676
private abstract class TimeUnitPreparedRounding extends PreparedRounding {
672677
@Override
673678
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
@@ -1045,6 +1050,11 @@ public String toString() {
10451050
return "Rounding[" + interval + " in " + timeZone + "]";
10461051
}
10471052

1053+
@Override
1054+
public boolean isUTC() {
1055+
return "Z".equals(timeZone.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
1056+
}
1057+
10481058
private long roundKey(long value, long interval) {
10491059
if (value < 0) {
10501060
return (value - interval + 1) / interval;
@@ -1364,6 +1374,11 @@ public boolean equals(Object obj) {
13641374
public String toString() {
13651375
return delegate + " offset by " + offset;
13661376
}
1377+
1378+
@Override
1379+
public boolean isUTC() {
1380+
return delegate.isUTC();
1381+
}
13671382
}
13681383

13691384
public static Rounding read(StreamInput in) throws IOException {
@@ -1391,16 +1406,8 @@ public static OptionalLong getInterval(Rounding rounding) {
13911406

13921407
if (rounding instanceof TimeUnitRounding) {
13931408
interval = (((TimeUnitRounding) rounding).unit).extraLocalOffsetLookup();
1394-
if (!isUTCTimeZone(((TimeUnitRounding) rounding).timeZone)) {
1395-
// Fast filter aggregation cannot be used if it needs time zone rounding
1396-
return OptionalLong.empty();
1397-
}
13981409
} else if (rounding instanceof TimeIntervalRounding) {
13991410
interval = ((TimeIntervalRounding) rounding).interval;
1400-
if (!isUTCTimeZone(((TimeIntervalRounding) rounding).timeZone)) {
1401-
// Fast filter aggregation cannot be used if it needs time zone rounding
1402-
return OptionalLong.empty();
1403-
}
14041411
} else {
14051412
return OptionalLong.empty();
14061413
}
@@ -1412,7 +1419,5 @@ public static OptionalLong getInterval(Rounding rounding) {
14121419
* Helper function for checking if the time zone requested for date histogram
14131420
* aggregation is utc or not
14141421
*/
1415-
private static boolean isUTCTimeZone(final ZoneId zoneId) {
1416-
return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
1417-
}
1422+
public abstract boolean isUTC();
14181423
}

server/src/main/java/org/opensearch/search/DocValueFormat.java

+4
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,10 @@ public DateMathParser getDateMathParser() {
286286
return parser;
287287
}
288288

289+
public ZoneId getZoneId() {
290+
return timeZone;
291+
}
292+
289293
@Override
290294
public String format(long value) {
291295
return formatter.format(resolution.toInstant(value).atZone(timeZone));

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

+8
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,14 @@ protected boolean canOptimize() {
182182
});
183183
}
184184

185+
/**
186+
* The filter rewrite optimized path does not support bucket intervals which are not fixed.
187+
* For this reason we exclude non UTC timezones.
188+
*/
189+
if (valuesSource.getRounding().isUTC() == false) {
190+
return false;
191+
}
192+
185193
// bucketOrds is used for saving the date histogram results got from the optimization path
186194
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
187195
return true;

server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,15 @@ public abstract class DateHistogramAggregatorBridge extends AggregatorBridge {
3232

3333
int maxRewriteFilters;
3434

35-
protected boolean canOptimize(ValuesSourceConfig config) {
35+
protected boolean canOptimize(ValuesSourceConfig config, Rounding rounding) {
36+
/**
37+
* The filter rewrite optimized path does not support bucket intervals which are not fixed.
38+
* For this reason we exclude non UTC timezones.
39+
*/
40+
if (rounding.isUTC() == false) {
41+
return false;
42+
}
43+
3644
if (config.script() == null && config.missing() == null) {
3745
MappedFieldType fieldType = config.fieldType();
3846
if (fieldType instanceof DateFieldMapper.DateFieldType) {

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

+46-4
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ private AutoDateHistogramAggregator(
149149
Aggregator parent,
150150
Map<String, Object> metadata
151151
) throws IOException {
152-
153152
super(name, factories, aggregationContext, parent, metadata);
154153
this.targetBuckets = targetBuckets;
155154
// TODO: Remove null usage here, by using a different aggregator for create
@@ -162,22 +161,34 @@ private AutoDateHistogramAggregator(
162161
DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() {
163162
@Override
164163
protected boolean canOptimize() {
165-
return canOptimize(valuesSourceConfig);
164+
return canOptimize(valuesSourceConfig, roundingInfos[0].rounding);
166165
}
167166

168167
@Override
169168
protected void prepare() throws IOException {
170169
buildRanges(context);
171170
}
172171

172+
/**
173+
* The filter rewrite optimization uses this method to pre-emptively update the preparedRounding
174+
* when considering the optimized path for a single segment. This is necessary since the optimized path
175+
* skips doc collection entirely which is where the preparedRounding is normally updated.
176+
*
177+
* @param low lower bound of rounding to prepare
178+
* @param high upper bound of rounding to prepare
179+
* @return select a prepared rounding which satisfies the conditions:
180+
* 1. Is at least as large as our previously prepared rounding
181+
* 2. Must span a range of [low, high] with buckets <= targetBuckets
182+
*/
173183
@Override
174184
protected Rounding getRounding(final long low, final long high) {
175185
// max - min / targetBuckets = bestDuration
176186
// find the right innerInterval this bestDuration belongs to
177187
// since we cannot exceed targetBuckets, bestDuration should go up,
178188
// so the right innerInterval should be an upper bound
179189
long bestDuration = (high - low) / targetBuckets;
180-
// reset so this function is idempotent
190+
191+
int prevRoundingIdx = roundingIdx;
181192
roundingIdx = 0;
182193
while (roundingIdx < roundingInfos.length - 1) {
183194
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
@@ -190,7 +201,11 @@ protected Rounding getRounding(final long low, final long high) {
190201
roundingIdx++;
191202
}
192203

193-
preparedRounding = prepareRounding(roundingIdx);
204+
// Ensure preparedRounding never shrinks
205+
if (roundingIdx > prevRoundingIdx) {
206+
preparedRounding = prepareRounding(roundingIdx);
207+
}
208+
194209
return roundingInfos[roundingIdx].rounding;
195210
}
196211

@@ -403,12 +418,39 @@ private void collectValue(int doc, long rounded) throws IOException {
403418
increaseRoundingIfNeeded(rounded);
404419
}
405420

421+
/**
422+
* Examine our current bucket count and the most recently added bucket to determine if an update to
423+
* preparedRounding is required to keep total bucket count in compliance with targetBuckets.
424+
*
425+
* @param rounded the most recently collected value rounded
426+
*/
406427
private void increaseRoundingIfNeeded(long rounded) {
428+
// If we are already using the rounding with the largest interval nothing can be done
407429
if (roundingIdx >= roundingInfos.length - 1) {
408430
return;
409431
}
432+
433+
// Re calculate the max and min values we expect to bucket according to most recently rounded val
410434
min = Math.min(min, rounded);
411435
max = Math.max(max, rounded);
436+
437+
/**
438+
* Quick explanation of the two below conditions:
439+
*
440+
* 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()]
441+
* Represents the total bucket count possible before we will exceed targetBuckets
442+
* even if we use the maximum inner interval of our current rounding. For example, consider the
443+
* DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets).
444+
* targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of
445+
* 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets
446+
* than our targetBuckets limit. If the current count of buckets exceeds this limit we must update
447+
* our rounding.
448+
*
449+
* 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()]
450+
* The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding
451+
* getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min)
452+
* exceeds this range we must update our rounding.
453+
*/
412454
if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
413455
&& max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) {
414456
return;

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
143143
DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() {
144144
@Override
145145
protected boolean canOptimize() {
146-
return canOptimize(valuesSourceConfig);
146+
return canOptimize(valuesSourceConfig, rounding);
147147
}
148148

149149
@Override

server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java

+55
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
import org.apache.lucene.document.SortedSetDocValuesField;
3939
import org.apache.lucene.index.DirectoryReader;
4040
import org.apache.lucene.index.IndexReader;
41+
import org.apache.lucene.index.IndexWriterConfig;
4142
import org.apache.lucene.index.IndexableField;
43+
import org.apache.lucene.index.NoMergePolicy;
4244
import org.apache.lucene.search.IndexSearcher;
4345
import org.apache.lucene.search.MatchAllDocsQuery;
4446
import org.apache.lucene.search.MatchNoDocsQuery;
@@ -72,6 +74,7 @@
7274
import java.time.Instant;
7375
import java.time.LocalDate;
7476
import java.time.YearMonth;
77+
import java.time.ZoneId;
7578
import java.time.ZoneOffset;
7679
import java.time.ZonedDateTime;
7780
import java.util.ArrayList;
@@ -912,6 +915,58 @@ public void testWithPipelineReductions() throws IOException {
912915
);
913916
}
914917

918+
// Bugfix: https://github.com/opensearch-project/OpenSearch/issues/16932
919+
public void testFilterRewriteWithTZRoundingRangeAssert() throws IOException {
920+
/*
921+
multiBucketIndexData must overlap with DST to produce a 'LinkedListLookup' prepared rounding.
922+
This lookup rounding style maintains a strict max/min input range and will assert each value is in range.
923+
*/
924+
final List<ZonedDateTime> multiBucketIndexData = Arrays.asList(
925+
ZonedDateTime.of(2023, 10, 10, 0, 0, 0, 0, ZoneOffset.UTC),
926+
ZonedDateTime.of(2023, 11, 11, 0, 0, 0, 0, ZoneOffset.UTC)
927+
);
928+
929+
final List<ZonedDateTime> singleBucketIndexData = Arrays.asList(ZonedDateTime.of(2023, 12, 27, 0, 0, 0, 0, ZoneOffset.UTC));
930+
931+
try (Directory directory = newDirectory()) {
932+
/*
933+
Ensure we produce two segments on one shard such that the documents in seg 1 will be out of range of the
934+
prepared rounding produced by the filter rewrite optimization considering seg 2 for optimized path.
935+
*/
936+
IndexWriterConfig c = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
937+
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, c)) {
938+
indexSampleData(multiBucketIndexData, indexWriter);
939+
indexWriter.flush();
940+
indexSampleData(singleBucketIndexData, indexWriter);
941+
}
942+
943+
try (IndexReader indexReader = DirectoryReader.open(directory)) {
944+
final IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
945+
946+
// Force agg to update rounding when it begins collecting from the second segment.
947+
final AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name");
948+
aggregationBuilder.setNumBuckets(3).field(DATE_FIELD).timeZone(ZoneId.of("America/New_York"));
949+
950+
Map<String, Integer> expectedDocCount = new TreeMap<>();
951+
expectedDocCount.put("2023-10-01T00:00:00.000-04:00", 1);
952+
expectedDocCount.put("2023-11-01T00:00:00.000-04:00", 1);
953+
expectedDocCount.put("2023-12-01T00:00:00.000-05:00", 1);
954+
955+
final InternalAutoDateHistogram histogram = searchAndReduce(
956+
indexSearcher,
957+
DEFAULT_QUERY,
958+
aggregationBuilder,
959+
false,
960+
new DateFieldMapper.DateFieldType(aggregationBuilder.field()),
961+
new NumberFieldMapper.NumberFieldType(INSTANT_FIELD, NumberFieldMapper.NumberType.LONG),
962+
new NumberFieldMapper.NumberFieldType(NUMERIC_FIELD, NumberFieldMapper.NumberType.LONG)
963+
);
964+
965+
assertThat(bucketCountsAsMap(histogram), equalTo(expectedDocCount));
966+
}
967+
}
968+
}
969+
915970
@Override
916971
protected IndexSettings createIndexSettings() {
917972
final Settings nodeSettings = Settings.builder().put("search.max_buckets", 25000).build();

test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java

+17-5
Original file line numberDiff line numberDiff line change
@@ -611,9 +611,19 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
611611
IndexSearcher searcher,
612612
Query query,
613613
AggregationBuilder builder,
614+
boolean shardFanOut,
614615
MappedFieldType... fieldTypes
615616
) throws IOException {
616-
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
617+
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, shardFanOut, fieldTypes);
618+
}
619+
620+
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
621+
IndexSearcher searcher,
622+
Query query,
623+
AggregationBuilder builder,
624+
MappedFieldType... fieldTypes
625+
) throws IOException {
626+
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, randomBoolean(), fieldTypes);
617627
}
618628

619629
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
@@ -623,7 +633,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
623633
AggregationBuilder builder,
624634
MappedFieldType... fieldTypes
625635
) throws IOException {
626-
return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
636+
return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, randomBoolean(), fieldTypes);
627637
}
628638

629639
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
@@ -633,7 +643,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
633643
int maxBucket,
634644
MappedFieldType... fieldTypes
635645
) throws IOException {
636-
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
646+
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, randomBoolean(), fieldTypes);
637647
}
638648

639649
protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
@@ -642,9 +652,10 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
642652
Query query,
643653
AggregationBuilder builder,
644654
int maxBucket,
655+
boolean shardFanOut,
645656
MappedFieldType... fieldTypes
646657
) throws IOException {
647-
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, fieldTypes);
658+
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, shardFanOut, fieldTypes);
648659
}
649660

650661
/**
@@ -662,6 +673,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
662673
AggregationBuilder builder,
663674
int maxBucket,
664675
boolean hasNested,
676+
boolean shardFanOut,
665677
MappedFieldType... fieldTypes
666678
) throws IOException {
667679
final IndexReaderContext ctx = searcher.getTopReaderContext();
@@ -677,7 +689,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
677689
);
678690
C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);
679691

680-
if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) {
692+
if (shardFanOut && searcher.getIndexReader().leaves().size() > 0) {
681693
assertThat(ctx, instanceOf(CompositeReaderContext.class));
682694
final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
683695
final int size = compCTX.leaves().size();

0 commit comments

Comments
 (0)