Skip to content

Commit 170ea27

Browse files
Refactor the filter rewrite optimization (opensearch-project#14464)
* Refactor Split the single Helper classes and move the classes into a new package for any optimization we introduced for search path. Rename the class name to make it more straightforward and general Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor refactor the canOptimize logic sort out the basic rule about how to provide data from aggregator, and where to put common logic Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor refactor the data provider and try optimize logic Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor extract segment match all logic Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor inline class Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Fix a bug Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * address comment Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * prepareFromSegment now doesn't return Ranges Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * how it looks like when introduce interfaces Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * remove interface, clean up Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * improve doc Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * move multirangetraversal logic to helper Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * improve the refactor package name -> filterrewrite move tree traversal logic to new class add documentation for important abstract methods add sub class for composite aggregation bridge Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Address Marc's comments Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Address concurrent segment search concern To save the ranges per segment, now change to a map that save ranges for segments separately. The increment document function "incrementBucketDocCount" should already be thread safe, as it's the same method used by normal aggregation execution path Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * remove circular dependency Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Address comment - remove map of segment ranges, pass in by calling getRanges when needed - use AtomicInteger for the debug info Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> --------- Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 978d14e commit 170ea27

14 files changed

+1256
-1027
lines changed

server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java

-849
This file was deleted.

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

+59-62
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@
7373
import org.opensearch.search.aggregations.MultiBucketCollector;
7474
import org.opensearch.search.aggregations.MultiBucketConsumerService;
7575
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
76-
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
77-
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper.AbstractDateHistogramAggregationType;
76+
import org.opensearch.search.aggregations.bucket.filterrewrite.CompositeAggregatorBridge;
77+
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
7878
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
7979
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
8080
import org.opensearch.search.internal.SearchContext;
@@ -89,13 +89,15 @@
8989
import java.util.List;
9090
import java.util.Map;
9191
import java.util.function.BiConsumer;
92+
import java.util.function.Function;
9293
import java.util.function.LongUnaryOperator;
9394
import java.util.stream.Collectors;
9495

9596
import static org.opensearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING;
97+
import static org.opensearch.search.aggregations.bucket.filterrewrite.DateHistogramAggregatorBridge.segmentMatchAll;
9698

9799
/**
98-
* Main aggregator that aggregates docs from mulitple aggregations
100+
* Main aggregator that aggregates docs from multiple aggregations
99101
*
100102
* @opensearch.internal
101103
*/
@@ -118,9 +120,8 @@ public final class CompositeAggregator extends BucketsAggregator {
118120

119121
private boolean earlyTerminated;
120122

121-
private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
122-
private LongKeyedBucketOrds bucketOrds = null;
123-
private Rounding.Prepared preparedRounding = null;
123+
private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
124+
private LongKeyedBucketOrds bucketOrds;
124125

125126
CompositeAggregator(
126127
String name,
@@ -166,57 +167,62 @@ public final class CompositeAggregator extends BucketsAggregator {
166167
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
167168
this.rawAfterKey = rawAfterKey;
168169

169-
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(context);
170-
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) {
171-
return;
172-
}
173-
fastFilterContext.setAggregationType(new CompositeAggregationType());
174-
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
175-
// bucketOrds is used for saving date histogram results
176-
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
177-
preparedRounding = ((CompositeAggregationType) fastFilterContext.getAggregationType()).getRoundingPrepared();
178-
fastFilterContext.buildRanges(sourceConfigs[0].fieldType());
179-
}
180-
}
170+
CompositeAggregatorBridge bridge = new CompositeAggregatorBridge() {
171+
private RoundingValuesSource valuesSource;
172+
private long afterKey = -1L;
181173

182-
/**
183-
* Currently the filter rewrite is only supported for date histograms
184-
*/
185-
public class CompositeAggregationType extends AbstractDateHistogramAggregationType {
186-
private final RoundingValuesSource valuesSource;
187-
private long afterKey = -1L;
188-
189-
public CompositeAggregationType() {
190-
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
191-
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
192-
if (rawAfterKey != null) {
193-
assert rawAfterKey.size() == 1 && formats.size() == 1;
194-
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
195-
throw new IllegalArgumentException("now() is not supported in [after] key");
196-
});
174+
@Override
175+
protected boolean canOptimize() {
176+
if (canOptimize(sourceConfigs)) {
177+
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
178+
if (rawAfterKey != null) {
179+
assert rawAfterKey.size() == 1 && formats.size() == 1;
180+
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
181+
throw new IllegalArgumentException("now() is not supported in [after] key");
182+
});
183+
}
184+
185+
// bucketOrds is used for saving the date histogram results got from the optimization path
186+
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
187+
return true;
188+
}
189+
return false;
197190
}
198-
}
199191

200-
public Rounding getRounding(final long low, final long high) {
201-
return valuesSource.getRounding();
202-
}
192+
@Override
193+
protected void prepare() throws IOException {
194+
buildRanges(context);
195+
}
203196

204-
public Rounding.Prepared getRoundingPrepared() {
205-
return valuesSource.getPreparedRounding();
206-
}
197+
protected Rounding getRounding(final long low, final long high) {
198+
return valuesSource.getRounding();
199+
}
207200

208-
@Override
209-
protected void processAfterKey(long[] bound, long interval) {
210-
// afterKey is the last bucket key in previous response, and the bucket key
211-
// is the minimum of all values in the bucket, so need to add the interval
212-
if (afterKey != -1L) {
213-
bound[0] = afterKey + interval;
201+
protected Rounding.Prepared getRoundingPrepared() {
202+
return valuesSource.getPreparedRounding();
214203
}
215-
}
216204

217-
public int getSize() {
218-
return size;
219-
}
205+
@Override
206+
protected long[] processAfterKey(long[] bounds, long interval) {
207+
// afterKey is the last bucket key in previous response, and the bucket key
208+
// is the minimum of all values in the bucket, so need to add the interval
209+
if (afterKey != -1L) {
210+
bounds[0] = afterKey + interval;
211+
}
212+
return bounds;
213+
}
214+
215+
@Override
216+
protected int getSize() {
217+
return size;
218+
}
219+
220+
@Override
221+
protected Function<Long, Long> bucketOrdProducer() {
222+
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
223+
}
224+
};
225+
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
220226
}
221227

222228
@Override
@@ -368,7 +374,7 @@ private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortFiel
368374
return v2 != null && DocValues.unwrapSingleton(v2) == null;
369375

370376
default:
371-
// we have no clue whether the field is multi-valued or not so we assume it is.
377+
// we have no clue whether the field is multivalued or not so we assume it is.
372378
return true;
373379
}
374380
}
@@ -551,11 +557,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
551557

552558
@Override
553559
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
554-
boolean optimized = fastFilterContext.tryFastFilterAggregation(
555-
ctx,
556-
this::incrementBucketDocCount,
557-
(key) -> bucketOrds.add(0, preparedRounding.round((long) key))
558-
);
560+
boolean optimized = filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
559561
if (optimized) throw new CollectionTerminatedException();
560562

561563
finishLeaf();
@@ -709,11 +711,6 @@ private static class Entry {
709711

710712
@Override
711713
public void collectDebugInfo(BiConsumer<String, Object> add) {
712-
if (fastFilterContext.optimizedSegments > 0) {
713-
add.accept("optimized_segments", fastFilterContext.optimizedSegments);
714-
add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments);
715-
add.accept("leaf_visited", fastFilterContext.leaf);
716-
add.accept("inner_visited", fastFilterContext.inner);
717-
}
714+
filterRewriteOptimizationContext.populateDebugInfo(add);
718715
}
719716
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations.bucket.filterrewrite;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.apache.lucene.index.PointValues;
13+
import org.opensearch.index.mapper.MappedFieldType;
14+
15+
import java.io.IOException;
16+
import java.util.function.BiConsumer;
17+
import java.util.function.Consumer;
18+
19+
/**
20+
* This interface provides a bridge between an aggregator and the optimization context, allowing
21+
* the aggregator to provide data and optimize the aggregation process.
22+
*
23+
* <p>The main purpose of this interface is to encapsulate the aggregator-specific optimization
24+
* logic and provide access to the data in Aggregator that is required for optimization, while keeping the optimization
25+
* business logic separate from the aggregator implementation.
26+
*
27+
* <p>To use this interface to optimize an aggregator, you should subclass this interface in this package
28+
* and put any specific optimization business logic in it. Then implement this subclass in the aggregator
29+
* to provide data that is needed for doing the optimization
30+
*
31+
* @opensearch.internal
32+
*/
33+
public abstract class AggregatorBridge {
34+
35+
/**
36+
* The field type associated with this aggregator bridge.
37+
*/
38+
MappedFieldType fieldType;
39+
40+
Consumer<Ranges> setRanges;
41+
42+
void setRangesConsumer(Consumer<Ranges> setRanges) {
43+
this.setRanges = setRanges;
44+
}
45+
46+
/**
47+
* Checks whether the aggregator can be optimized.
48+
* <p>
49+
* This method is supposed to be implemented in a specific aggregator to take in fields from there
50+
*
51+
* @return {@code true} if the aggregator can be optimized, {@code false} otherwise.
52+
* The result will be saved in the optimization context.
53+
*/
54+
protected abstract boolean canOptimize();
55+
56+
/**
57+
* Prepares the optimization at shard level after checking aggregator is optimizable.
58+
* <p>
59+
* For example, figure out what are the ranges from the aggregation to do the optimization later
60+
* <p>
61+
* This method is supposed to be implemented in a specific aggregator to take in fields from there
62+
*/
63+
protected abstract void prepare() throws IOException;
64+
65+
/**
66+
* Prepares the optimization for a specific segment when the segment is functionally matching all docs
67+
*
68+
* @param leaf the leaf reader context for the segment
69+
*/
70+
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;
71+
72+
/**
73+
* Attempts to build aggregation results for a segment
74+
*
75+
* @param values the point values (index structure for numeric values) for a segment
76+
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
77+
* @param ranges
78+
*/
79+
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
80+
PointValues values,
81+
BiConsumer<Long, Long> incrementDocCount,
82+
Ranges ranges
83+
) throws IOException;
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.aggregations.bucket.filterrewrite;
10+
11+
import org.opensearch.index.mapper.DateFieldMapper;
12+
import org.opensearch.index.mapper.MappedFieldType;
13+
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig;
14+
import org.opensearch.search.aggregations.bucket.composite.RoundingValuesSource;
15+
16+
/**
17+
* For composite aggregation to do optimization when it only has a single date histogram source
18+
*/
19+
public abstract class CompositeAggregatorBridge extends DateHistogramAggregatorBridge {
20+
protected boolean canOptimize(CompositeValuesSourceConfig[] sourceConfigs) {
21+
if (sourceConfigs.length != 1 || !(sourceConfigs[0].valuesSource() instanceof RoundingValuesSource)) return false;
22+
return canOptimize(sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript(), sourceConfigs[0].fieldType());
23+
}
24+
25+
private boolean canOptimize(boolean missing, boolean hasScript, MappedFieldType fieldType) {
26+
if (!missing && !hasScript) {
27+
if (fieldType instanceof DateFieldMapper.DateFieldType) {
28+
if (fieldType.isSearchable()) {
29+
this.fieldType = fieldType;
30+
return true;
31+
}
32+
}
33+
}
34+
return false;
35+
}
36+
}

0 commit comments

Comments
 (0)