Skip to content

Commit 7aeee3d

Browse files
committed
handle segment level bounds separately
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 1ddcc00 commit 7aeee3d

File tree

3 files changed

+92
-40
lines changed

3 files changed

+92
-40
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java

+55-28
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.opensearch.index.mapper.DocCountFieldMapper;
3232
import org.opensearch.index.mapper.MappedFieldType;
3333
import org.opensearch.index.query.DateRangeIncludingNowQuery;
34-
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregator;
3534
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
3635
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;
3736
import org.opensearch.search.aggregations.bucket.histogram.LongBounds;
@@ -90,11 +89,11 @@ private static Query unwrapIntoConcreteQuery(Query query) {
9089
}
9190

9291
/**
93-
* Finds the global min and max bounds of the field for the shard from each segment
92+
* Finds the global min and max bounds of the field for the shard across all segments
9493
*
9594
* @return null if the field is empty or not indexed
9695
*/
97-
private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException {
96+
private static long[] getShardBounds(final SearchContext context, final String fieldName) throws IOException {
9897
final List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves();
9998
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
10099
for (LeafReaderContext leaf : leaves) {
@@ -111,6 +110,25 @@ private static long[] getIndexBounds(final SearchContext context, final String f
111110
return new long[] { min, max };
112111
}
113112

113+
/**
114+
* Finds the min and max bounds of the field for the segment
115+
*
116+
* @return null if the field is empty or not indexed
117+
*/
118+
private static long[] getSegmentBounds(final LeafReaderContext context, final String fieldName) throws IOException {
119+
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
120+
final PointValues values = context.reader().getPointValues(fieldName);
121+
if (values != null) {
122+
min = Math.min(min, NumericUtils.sortableBytesToLong(values.getMinPackedValue(), 0));
123+
max = Math.max(max, NumericUtils.sortableBytesToLong(values.getMaxPackedValue(), 0));
124+
}
125+
126+
if (min == Long.MAX_VALUE || max == Long.MIN_VALUE) {
127+
return null;
128+
}
129+
return new long[] { min, max };
130+
}
131+
114132
/**
115133
* This method also acts as a pre-condition check for the optimization
116134
*
@@ -120,32 +138,21 @@ public static long[] getDateHistoAggBounds(final SearchContext context, final St
120138
final Query cq = unwrapIntoConcreteQuery(context.query());
121139
if (cq instanceof PointRangeQuery) {
122140
final PointRangeQuery prq = (PointRangeQuery) cq;
123-
final long[] indexBounds = getIndexBounds(context, fieldName);
141+
final long[] indexBounds = getShardBounds(context, fieldName);
124142
if (indexBounds == null) return null;
125143
return getBoundsWithRangeQuery(prq, fieldName, indexBounds);
126144
} else if (cq instanceof MatchAllDocsQuery) {
127-
return getIndexBounds(context, fieldName);
145+
return getShardBounds(context, fieldName);
128146
} else if (cq instanceof FieldExistsQuery) {
129147
// when a range query covers all values of a shard, it will be rewrite field exists query
130148
if (((FieldExistsQuery) cq).getField().equals(fieldName)) {
131-
return getIndexBounds(context, fieldName);
149+
return getShardBounds(context, fieldName);
132150
}
133151
}
134152

135153
return null;
136154
}
137155

138-
private static long[] getDateHistoAggBoundsSegLevel(final SearchContext context, final String fieldName) throws IOException {
139-
final long[] indexBounds = getIndexBounds(context, fieldName);
140-
if (indexBounds == null) return null;
141-
final Query cq = unwrapIntoConcreteQuery(context.query());
142-
if (cq instanceof PointRangeQuery) {
143-
final PointRangeQuery prq = (PointRangeQuery) cq;
144-
return getBoundsWithRangeQuery(prq, fieldName, indexBounds);
145-
}
146-
return indexBounds;
147-
}
148-
149156
private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldName, long[] indexBounds) {
150157
// Ensure that the query and aggregation are on the same field
151158
if (prq.getField().equals(fieldName)) {
@@ -158,6 +165,7 @@ private static long[] getBoundsWithRangeQuery(PointRangeQuery prq, String fieldN
158165
}
159166
return new long[] { lower, upper };
160167
}
168+
161169
return null;
162170
}
163171

@@ -230,7 +238,6 @@ public static class FastFilterContext {
230238
private boolean rewriteable = false;
231239
private Weight[] filters = null;
232240
private boolean filtersBuiltAtShardLevel = false;
233-
private boolean shouldBuildFiltersAtSegmentLevel = true;
234241

235242
private AggregationType aggregationType;
236243
private final SearchContext context;
@@ -278,6 +285,10 @@ interface AggregationType {
278285

279286
Weight[] buildFastFilter(SearchContext ctx, CheckedBiFunction<SearchContext, String, long[], IOException> getBounds)
280287
throws IOException;
288+
289+
default int getSize() {
290+
return Integer.MAX_VALUE;
291+
}
281292
}
282293

283294
/**
@@ -314,8 +325,23 @@ public boolean isRewriteable(Object parent, int subAggLength) {
314325
public Weight[] buildFastFilter(SearchContext context, CheckedBiFunction<SearchContext, String, long[], IOException> getBounds)
315326
throws IOException {
316327
long[] bounds = getBounds.apply(context, fieldType.name());
317-
bounds = processHardBounds(bounds);
318328
logger.debug("Bounds are {} for shard {}", bounds, context.indexShard().shardId());
329+
return buildFastFilter(context, bounds);
330+
}
331+
332+
private Weight[] buildFastFilterWithSegBounds(
333+
SearchContext context,
334+
CheckedBiFunction<LeafReaderContext, String, long[], IOException> getBounds,
335+
LeafReaderContext leaf
336+
) throws IOException {
337+
long[] bounds = getBounds.apply(leaf, fieldType.name());
338+
logger.debug("Bounds are {} for shard {} segment {}", bounds, context.indexShard().shardId(), leaf.ord);
339+
return buildFastFilter(context, bounds);
340+
}
341+
342+
private Weight[] buildFastFilter(SearchContext context, long[] bounds) throws IOException {
343+
bounds = processHardBounds(bounds);
344+
logger.debug("Bounds are {} for shard {} with hard bound", bounds, context.indexShard().shardId());
319345
if (bounds == null) {
320346
return null;
321347
}
@@ -394,7 +420,7 @@ public static boolean tryFastFilterAggregation(
394420
final BiConsumer<Long, Integer> incrementDocCount
395421
) throws IOException {
396422
if (fastFilterContext == null) return false;
397-
if (!fastFilterContext.rewriteable || !fastFilterContext.shouldBuildFiltersAtSegmentLevel) {
423+
if (!fastFilterContext.rewriteable) {
398424
return false;
399425
}
400426

@@ -420,11 +446,15 @@ public static boolean tryFastFilterAggregation(
420446
fastFilterContext.context.indexShard().shardId(),
421447
ctx.ord
422448
);
423-
filters = fastFilterContext.buildFastFilter(FastFilterRewriteHelper::getDateHistoAggBoundsSegLevel);
449+
if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) {
450+
filters = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).buildFastFilterWithSegBounds(
451+
fastFilterContext.context,
452+
FastFilterRewriteHelper::getSegmentBounds,
453+
ctx
454+
);
455+
}
424456
if (filters == null) {
425-
// At segment level, build filter should only be called once
426-
// since the conditions for build filter won't change for other segments
427-
fastFilterContext.shouldBuildFiltersAtSegmentLevel = false;
457+
428458
return false;
429459
}
430460
}
@@ -441,7 +471,7 @@ public static boolean tryFastFilterAggregation(
441471
}
442472

443473
int s = 0;
444-
int size = Integer.MAX_VALUE;
474+
int size = fastFilterContext.aggregationType.getSize();
445475
for (i = 0; i < filters.length; i++) {
446476
if (counts[i] > 0) {
447477
long bucketKey = i; // the index of filters is the key for filters aggregation
@@ -451,9 +481,6 @@ public static boolean tryFastFilterAggregation(
451481
bucketKey = fieldType.convertNanosToMillis(
452482
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
453483
);
454-
if (fastFilterContext.aggregationType instanceof CompositeAggregator.CompositeAggregationType) {
455-
size = ((CompositeAggregator.CompositeAggregationType) fastFilterContext.aggregationType).getSize();
456-
}
457484
}
458485
incrementDocCount.accept(bucketKey, counts[i]);
459486
s++;

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797
*
9898
* @opensearch.internal
9999
*/
100-
public final class CompositeAggregator extends BucketsAggregator {
100+
final class CompositeAggregator extends BucketsAggregator {
101101
private final int size;
102102
private final List<String> sourceNames;
103103
private final int[] reverseMuls;
@@ -178,7 +178,7 @@ public final class CompositeAggregator extends BucketsAggregator {
178178
/**
179179
* Currently the filter rewrite is only supported for date histograms
180180
*/
181-
public class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
181+
private class CompositeAggregationType extends FastFilterRewriteHelper.AbstractDateHistogramAggregationType {
182182
private final RoundingValuesSource valuesSource;
183183
private long afterKey = -1L;
184184

@@ -210,6 +210,7 @@ protected void processAfterKey(long[] bound, long interval) {
210210
}
211211
}
212212

213+
@Override
213214
public int getSize() {
214215
return size;
215216
}

server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregatorTests.java

+34-10
Original file line numberDiff line numberDiff line change
@@ -1254,31 +1254,37 @@ public void testHardBoundsNotOverlapping() throws IOException {
12541254
);
12551255
}
12561256

1257-
public void testRangeQuery() throws IOException {
1257+
public void testFilterRewriteOptimizationWithRangeQuery() throws IOException {
12581258
testSearchCase(
1259-
LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")),
1259+
LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2018-01-01"), asLong("2020-01-01")),
12601260
Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"),
12611261
aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE),
12621262
histogram -> {
12631263
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
12641264
assertEquals(0, buckets.size());
12651265
},
1266-
false
1266+
10000,
1267+
false,
1268+
false,
1269+
true // force AGGREGABLE_DATE field to be searchable to test the filter rewrite optimization path
12671270
);
12681271

12691272
testSearchCase(
1270-
LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")),
1273+
LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-01-01")),
12711274
Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"),
12721275
aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE),
12731276
histogram -> {
12741277
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
12751278
assertEquals(0, buckets.size());
12761279
},
1277-
false
1280+
10000,
1281+
false,
1282+
false,
1283+
true
12781284
);
12791285

12801286
testSearchCase(
1281-
LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")),
1287+
LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2016-01-01"), asLong("2017-02-02")),
12821288
Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"),
12831289
aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE),
12841290
histogram -> {
@@ -1293,11 +1299,14 @@ public void testRangeQuery() throws IOException {
12931299
assertEquals("2017-02-02T00:00:00.000Z", bucket.getKeyAsString());
12941300
assertEquals(2, bucket.getDocCount());
12951301
},
1296-
false
1302+
10000,
1303+
false,
1304+
false,
1305+
true
12971306
);
12981307

12991308
testSearchCase(
1300-
LongPoint.newRangeQuery(SEARCHABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")),
1309+
LongPoint.newRangeQuery(AGGREGABLE_DATE, asLong("2017-02-03"), asLong("2020-01-01")),
13011310
Arrays.asList("2017-02-01", "2017-02-02", "2017-02-02", "2017-02-03", "2017-02-03", "2017-02-03", "2017-02-05"),
13021311
aggregation -> aggregation.calendarInterval(DateHistogramInterval.DAY).field(AGGREGABLE_DATE),
13031312
histogram -> {
@@ -1316,7 +1325,10 @@ public void testRangeQuery() throws IOException {
13161325
assertEquals("2017-02-05T00:00:00.000Z", bucket.getKeyAsString());
13171326
assertEquals(1, bucket.getDocCount());
13181327
},
1319-
false
1328+
10000,
1329+
false,
1330+
false,
1331+
true
13201332
);
13211333
}
13221334

@@ -1388,7 +1400,19 @@ private void testSearchCase(
13881400
boolean useNanosecondResolution,
13891401
boolean useDocCountField
13901402
) throws IOException {
1391-
boolean aggregableDateIsSearchable = randomBoolean();
1403+
testSearchCase(query, dataset, configure, verify, maxBucket, useNanosecondResolution, useDocCountField, randomBoolean());
1404+
}
1405+
1406+
private void testSearchCase(
1407+
Query query,
1408+
List<String> dataset,
1409+
Consumer<DateHistogramAggregationBuilder> configure,
1410+
Consumer<InternalDateHistogram> verify,
1411+
int maxBucket,
1412+
boolean useNanosecondResolution,
1413+
boolean useDocCountField,
1414+
boolean aggregableDateIsSearchable
1415+
) throws IOException {
13921416
logger.debug("Aggregable date is searchable {}", aggregableDateIsSearchable);
13931417
DateFieldMapper.DateFieldType fieldType = aggregableDateFieldType(useNanosecondResolution, aggregableDateIsSearchable);
13941418

0 commit comments

Comments
 (0)