Skip to content

Commit eafa416

Browse files
Apply the fast filter optimization to composite aggregation (opensearch-project#11505) (opensearch-project#11914) (opensearch-project#12283)
1 parent 3ff62da commit eafa416

14 files changed

+585
-378
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
124124
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
125125
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
126126
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
127+
- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
127128
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
128129
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
129130
- Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209))

server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java

+385
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

+90-8
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@
5656
import org.apache.lucene.search.Weight;
5757
import org.apache.lucene.search.comparators.LongComparator;
5858
import org.apache.lucene.util.Bits;
59+
import org.apache.lucene.util.CollectionUtil;
5960
import org.apache.lucene.util.RoaringDocIdSet;
61+
import org.opensearch.common.Rounding;
6062
import org.opensearch.common.lease.Releasables;
6163
import org.opensearch.index.IndexSortConfig;
6264
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
@@ -71,7 +73,9 @@
7173
import org.opensearch.search.aggregations.MultiBucketCollector;
7274
import org.opensearch.search.aggregations.MultiBucketConsumerService;
7375
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
76+
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
7477
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
78+
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
7579
import org.opensearch.search.internal.SearchContext;
7680
import org.opensearch.search.searchafter.SearchAfterBuilder;
7781
import org.opensearch.search.sort.SortAndFormats;
@@ -80,6 +84,7 @@
8084
import java.util.ArrayList;
8185
import java.util.Arrays;
8286
import java.util.Collections;
87+
import java.util.HashMap;
8388
import java.util.List;
8489
import java.util.Map;
8590
import java.util.function.LongUnaryOperator;
@@ -111,6 +116,10 @@ final class CompositeAggregator extends BucketsAggregator {
111116

112117
private boolean earlyTerminated;
113118

119+
private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
120+
private LongKeyedBucketOrds bucketOrds = null;
121+
private Rounding.Prepared preparedRounding = null;
122+
114123
CompositeAggregator(
115124
String name,
116125
AggregatorFactories factories,
@@ -154,12 +163,33 @@ final class CompositeAggregator extends BucketsAggregator {
154163
}
155164
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
156165
this.rawAfterKey = rawAfterKey;
166+
167+
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
168+
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
169+
fastFilterContext.setAggregationType(
170+
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
171+
);
172+
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
173+
// bucketOrds is the data structure for saving date histogram results
174+
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
175+
// Currently the filter rewrite is only supported for date histograms
176+
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
177+
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
178+
preparedRounding = aggregationType.getRoundingPreparer();
179+
fastFilterContext.buildFastFilter(
180+
context,
181+
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
182+
x -> aggregationType.getRounding(),
183+
() -> preparedRounding
184+
);
185+
}
157186
}
158187

159188
@Override
160189
protected void doClose() {
161190
try {
162191
Releasables.close(queue);
192+
Releasables.close(bucketOrds);
163193
} finally {
164194
Releasables.close(sources);
165195
}
@@ -187,12 +217,14 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
187217
}
188218

189219
int num = Math.min(size, queue.size());
190-
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
220+
InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
221+
191222
long[] bucketOrdsToCollect = new long[queue.size()];
192223
for (int i = 0; i < queue.size(); i++) {
193224
bucketOrdsToCollect[i] = i;
194225
}
195226
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
227+
196228
while (queue.size() > 0) {
197229
int slot = queue.pop();
198230
CompositeKey key = queue.toCompositeKey(slot);
@@ -208,6 +240,43 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
208240
aggs
209241
);
210242
}
243+
244+
// Build results from fast filters optimization
245+
if (bucketOrds != null) {
246+
// CompositeKey is the value of bucket key
247+
final Map<CompositeKey, InternalComposite.InternalBucket> bucketMap = new HashMap<>();
248+
// Some segments may not be optimized, so buckets may contain results from the queue.
249+
for (InternalComposite.InternalBucket internalBucket : buckets) {
250+
bucketMap.put(internalBucket.getRawKey(), internalBucket);
251+
}
252+
// Loop over the buckets in the bucketOrds, and populate the map accordingly
253+
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
254+
while (ordsEnum.next()) {
255+
Long bucketKeyValue = ordsEnum.value();
256+
CompositeKey key = new CompositeKey(bucketKeyValue);
257+
if (bucketMap.containsKey(key)) {
258+
long docCount = bucketDocCount(ordsEnum.ord()) + bucketMap.get(key).getDocCount();
259+
bucketMap.get(key).setDocCount(docCount);
260+
} else {
261+
InternalComposite.InternalBucket bucket = new InternalComposite.InternalBucket(
262+
sourceNames,
263+
formats,
264+
key,
265+
reverseMuls,
266+
missingOrders,
267+
bucketDocCount(ordsEnum.ord()),
268+
buildEmptySubAggregations()
269+
);
270+
bucketMap.put(key, bucket);
271+
}
272+
}
273+
// since a map is not sorted structure, sort it before transform back to buckets
274+
List<InternalComposite.InternalBucket> bucketList = new ArrayList<>(bucketMap.values());
275+
CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey);
276+
buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new);
277+
num = buckets.length;
278+
}
279+
211280
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
212281
return new InternalAggregation[] {
213282
new InternalComposite(
@@ -296,16 +365,16 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException
296365

297366
if (indexSortField.getReverse() != (source.reverseMul == -1)) {
298367
if (i == 0) {
299-
// the leading index sort matches the leading source field but the order is reversed
368+
// the leading index sort matches the leading source field, but the order is reversed,
300369
// so we don't check the other sources.
301370
return new Sort(indexSortField);
302371
}
303372
break;
304373
}
305374
sortFields.add(indexSortField);
306375
if (sourceConfig.valuesSource() instanceof RoundingValuesSource) {
307-
// the rounding "squashes" many values together, that breaks the ordering of sub-values
308-
// so we ignore subsequent source even if they match the index sort.
376+
// the rounding "squashes" many values together, that breaks the ordering of sub-values,
377+
// so we ignore the subsequent sources even if they match the index sort.
309378
break;
310379
}
311380
}
@@ -448,6 +517,16 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
448517

449518
@Override
450519
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
520+
boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
521+
ctx,
522+
fastFilterContext,
523+
(key, count) -> incrementBucketDocCount(
524+
FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
525+
count
526+
)
527+
);
528+
if (optimized) throw new CollectionTerminatedException();
529+
451530
finishLeaf();
452531

453532
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
@@ -477,9 +556,10 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
477556
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
478557
}
479558
if (rawAfterKey != null && sortPrefixLen > 0) {
480-
// We have an after key and index sort is applicable so we jump directly to the doc
481-
// that is after the index sort prefix using the rawAfterKey and we start collecting
482-
// document from there.
559+
// We have an after key and index sort is applicable, so we jump directly to the doc
560+
// after the index sort prefix using the rawAfterKey and we start collecting
561+
// documents from there.
562+
assert indexSortPrefix != null;
483563
processLeafFromQuery(ctx, indexSortPrefix);
484564
throw new CollectionTerminatedException();
485565
} else {
@@ -507,6 +587,8 @@ public void collect(int doc, long bucket) throws IOException {
507587
try {
508588
long docCount = docCountProvider.getDocCount(doc);
509589
if (queue.addIfCompetitive(indexSortPrefix, docCount)) {
590+
// one doc may contain multiple values, we iterate over and collect one by one
591+
// so the same doc can appear multiple times here
510592
if (builder != null && lastDoc != doc) {
511593
builder.add(doc);
512594
lastDoc = doc;
@@ -569,7 +651,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec
569651
@Override
570652
public void collect(int doc, long zeroBucket) throws IOException {
571653
assert zeroBucket == 0;
572-
Integer slot = queue.compareCurrent();
654+
Integer slot = queue.getCurrentSlot();
573655
if (slot != null) {
574656
// The candidate key is a top bucket.
575657
// We can defer the collection of this document/bucket to the sub collector

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeKey.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*
4545
* @opensearch.internal
4646
*/
47-
class CompositeKey implements Writeable {
47+
public class CompositeKey implements Writeable {
4848
private final Comparable[] values;
4949

5050
CompositeKey(Comparable... values) {
@@ -64,11 +64,11 @@ Comparable[] values() {
6464
return values;
6565
}
6666

67-
int size() {
67+
public int size() {
6868
return values.length;
6969
}
7070

71-
Comparable get(int pos) {
71+
public Comparable get(int pos) {
7272
assert pos < values.length;
7373
return values[pos];
7474
}

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java

+17-13
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747

4848
/**
4949
* A specialized {@link PriorityQueue} implementation for composite buckets.
50+
* Can think of this as a max heap that holds the top small buckets slots in order.
51+
* Each slot holds the values of the composite bucket key it represents.
5052
*
5153
* @opensearch.internal
5254
*/
@@ -77,7 +79,7 @@ public int hashCode() {
7779

7880
private final BigArrays bigArrays;
7981
private final int maxSize;
80-
private final Map<Slot, Integer> map;
82+
private final Map<Slot, Integer> map; // to quickly find the slot for a value
8183
private final SingleDimensionValuesSource<?>[] arrays;
8284

8385
private LongArray docCounts;
@@ -108,7 +110,7 @@ public int hashCode() {
108110

109111
@Override
110112
protected boolean lessThan(Integer a, Integer b) {
111-
return compare(a, b) > 0;
113+
return compare(a, b) > 0; // max heap
112114
}
113115

114116
/**
@@ -119,10 +121,10 @@ boolean isFull() {
119121
}
120122

121123
/**
122-
* Compares the current candidate with the values in the queue and returns
124+
* Try to get the slot of the current/candidate values in the queue and returns
123125
* the slot if the candidate is already in the queue or null if the candidate is not present.
124126
*/
125-
Integer compareCurrent() {
127+
Integer getCurrentSlot() {
126128
return map.get(new Slot(CANDIDATE_SLOT));
127129
}
128130

@@ -281,32 +283,34 @@ boolean addIfCompetitive(long inc) {
281283
*/
282284
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
283285
// checks if the candidate key is competitive
284-
Integer topSlot = compareCurrent();
285-
if (topSlot != null) {
286+
Integer curSlot = getCurrentSlot();
287+
if (curSlot != null) {
286288
// this key is already in the top N, skip it
287-
docCounts.increment(topSlot, inc);
289+
docCounts.increment(curSlot, inc);
288290
return true;
289291
}
292+
290293
if (afterKeyIsSet) {
291294
int cmp = compareCurrentWithAfter();
292295
if (cmp <= 0) {
293296
if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
294-
// the leading index sort is in the reverse order of the leading source
297+
// the leading index sort is and the leading source order are both reversed,
295298
// so we can early terminate when we reach a document that is smaller
296299
// than the after key (collected on a previous page).
297300
throw new CollectionTerminatedException();
298301
}
299-
// key was collected on a previous page, skip it (>= afterKey).
302+
// the key was collected on a previous page, skip it.
300303
return false;
301304
}
302305
}
306+
307+
// the heap is full, check if the candidate key larger than max heap top
303308
if (size() >= maxSize) {
304-
// the tree map is full, check if the candidate key should be kept
305309
int cmp = compare(CANDIDATE_SLOT, top());
306310
if (cmp > 0) {
307311
if (cmp <= indexSortSourcePrefix) {
308-
// index sort guarantees that there is no key greater or equal than the
309-
// current one in the subsequent documents so we can early terminate.
312+
// index sort guarantees the following documents will have a key larger than the current candidate,
313+
// so we can early terminate.
310314
throw new CollectionTerminatedException();
311315
}
312316
// the candidate key is not competitive, skip it.
@@ -324,7 +328,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
324328
} else {
325329
newSlot = size();
326330
}
327-
// move the candidate key to its new slot
331+
// move the candidate key to its new slot by copy its values to the new slot
328332
copyCurrent(newSlot, inc);
329333
map.put(new Slot(newSlot), newSlot);
330334
add(newSlot);

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public MissingOrder missingOrder() {
156156
/**
157157
* Returns true if the source contains a script that can change the value.
158158
*/
159-
protected boolean hasScript() {
159+
public boolean hasScript() {
160160
return hasScript;
161161
}
162162

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
303303
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
304304
// here
305305
Rounding.Prepared preparedRounding = rounding.prepareForUnknown();
306-
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding);
306+
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding, rounding);
307307
// is specified in the builder.
308308
final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format();
309309
final MappedFieldType fieldType = valuesSourceConfig.fieldType();

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public static class InternalBucket extends InternalMultiBucketAggregation.Intern
351351
KeyComparable<InternalBucket> {
352352

353353
private final CompositeKey key;
354-
private final long docCount;
354+
private long docCount;
355355
private final InternalAggregations aggregations;
356356
private final transient int[] reverseMuls;
357357
private final transient MissingOrder[] missingOrders;
@@ -448,6 +448,10 @@ public long getDocCount() {
448448
return docCount;
449449
}
450450

451+
public void setDocCount(long docCount) {
452+
this.docCount = docCount;
453+
}
454+
451455
@Override
452456
public Aggregations getAggregations() {
453457
return aggregations;

0 commit comments

Comments
 (0)