Skip to content

Commit 2d9e52d

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Fix double invocation of postCollection when MultiBucketCollector is present
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent ba0df74 commit 2d9e52d

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/terms/StringTermsIT.java

+68
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@
4242
import org.opensearch.index.query.QueryBuilders;
4343
import org.opensearch.script.Script;
4444
import org.opensearch.script.ScriptType;
45+
import org.opensearch.search.aggregations.AggregationBuilders;
4546
import org.opensearch.search.aggregations.AggregationExecutionException;
4647
import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode;
4748
import org.opensearch.search.aggregations.BucketOrder;
4849
import org.opensearch.search.aggregations.bucket.filter.Filter;
50+
import org.opensearch.search.aggregations.bucket.filter.InternalFilters;
4951
import org.opensearch.search.aggregations.bucket.terms.Terms.Bucket;
5052
import org.opensearch.search.aggregations.metrics.Avg;
5153
import org.opensearch.search.aggregations.metrics.ExtendedStats;
@@ -999,6 +1001,72 @@ public void testOtherDocCount() {
9991001
testOtherDocCount(SINGLE_VALUED_FIELD_NAME, MULTI_VALUED_FIELD_NAME);
10001002
}
10011003

1004+
public void testDeferredSubAggs() {
1005+
// Tests subAgg doc count is the same with different collection modes and additional top level aggs
1006+
SearchResponse r1 = client().prepareSearch("idx")
1007+
.setSize(0)
1008+
.addAggregation(
1009+
terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST)
1010+
.field("s_value")
1011+
.size(2)
1012+
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
1013+
)
1014+
.addAggregation(AggregationBuilders.min("min").field("constant"))
1015+
.get();
1016+
1017+
SearchResponse r2 = client().prepareSearch("idx")
1018+
.setSize(0)
1019+
.addAggregation(
1020+
terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST)
1021+
.field("s_value")
1022+
.size(2)
1023+
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
1024+
)
1025+
.addAggregation(AggregationBuilders.min("min").field("constant"))
1026+
.get();
1027+
1028+
SearchResponse r3 = client().prepareSearch("idx")
1029+
.setSize(0)
1030+
.addAggregation(
1031+
terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST)
1032+
.field("s_value")
1033+
.size(2)
1034+
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
1035+
)
1036+
.get();
1037+
1038+
SearchResponse r4 = client().prepareSearch("idx")
1039+
.setSize(0)
1040+
.addAggregation(
1041+
terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST)
1042+
.field("s_value")
1043+
.size(2)
1044+
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
1045+
)
1046+
.get();
1047+
1048+
assertNotNull(r1.getAggregations().get("terms1"));
1049+
assertNotNull(r2.getAggregations().get("terms1"));
1050+
assertNotNull(r3.getAggregations().get("terms1"));
1051+
assertNotNull(r4.getAggregations().get("terms1"));
1052+
1053+
Terms terms = r1.getAggregations().get("terms1");
1054+
Bucket b1 = terms.getBucketByKey("val0");
1055+
InternalFilters f1 = b1.getAggregations().get("filter");
1056+
long docCount1 = f1.getBuckets().get(0).getDocCount();
1057+
Bucket b2 = terms.getBucketByKey("val1");
1058+
InternalFilters f2 = b2.getAggregations().get("filter");
1059+
long docCount2 = f1.getBuckets().get(0).getDocCount();
1060+
1061+
for (SearchResponse response : new SearchResponse[] { r2, r3, r4 }) {
1062+
terms = response.getAggregations().get("terms1");
1063+
f1 = terms.getBucketByKey(b1.getKeyAsString()).getAggregations().get("filter");
1064+
f2 = terms.getBucketByKey(b2.getKeyAsString()).getAggregations().get("filter");
1065+
assertEquals(docCount1, f1.getBuckets().get(0).getDocCount());
1066+
assertEquals(docCount2, f2.getBuckets().get(0).getDocCount());
1067+
}
1068+
}
1069+
10021070
/**
10031071
* Make sure that a request using a deterministic script or not using a script get cached.
10041072
* Ensure requests using nondeterministic scripts do not get cached.

server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ public void processPostCollection(Collector collectorTree) throws IOException {
7171
collectors.offer(innerCollector);
7272
}
7373
} else if (currentCollector instanceof BucketCollector) {
74-
((BucketCollector) currentCollector).postCollection();
75-
7674
// Perform build aggregation during post collection
7775
if (currentCollector instanceof Aggregator) {
76+
// Do not perform postCollection for MultiBucketCollector as we are unwrapping that below
77+
((BucketCollector) currentCollector).postCollection();
7878
((Aggregator) currentCollector).buildTopLevel();
7979
} else if (currentCollector instanceof MultiBucketCollector) {
8080
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {

server/src/main/java/org/opensearch/search/aggregations/bucket/BestBucketsDeferringCollector.java

+2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ private void finishLeaf() {
124124
if (context != null) {
125125
assert docDeltasBuilder != null && bucketsBuilder != null;
126126
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
127+
context = null;
127128
}
128129
}
129130

@@ -161,6 +162,7 @@ public void preCollection() throws IOException {
161162

162163
@Override
163164
public void postCollection() throws IOException {
165+
assert searchContext.searcher().getLeafContexts().isEmpty() || finished != true;
164166
finishLeaf();
165167
finished = true;
166168
}

0 commit comments

Comments
 (0)