Skip to content

Commit 8332859

Browse files
Add cluster setting to dynamically configure filter rewrite optimization (#13179) (#13217)
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 1b1eff8 commit 8332859

File tree

7 files changed

+70
-4
lines changed

7 files changed

+70
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2020
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
2121
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
2222
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
23+
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
2324

2425
### Dependencies
2526
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/FilterRewriteIT.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1212

13+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1314
import org.opensearch.action.index.IndexRequestBuilder;
1415
import org.opensearch.action.search.SearchResponse;
1516
import org.opensearch.common.settings.Settings;
@@ -34,6 +35,7 @@
3435

3536
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
3637
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
38+
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
3739
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
3840
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3941

@@ -93,7 +95,7 @@ public void testMinDocCountOnDateHistogram() throws Exception {
9395
final SearchResponse allResponse = client().prepareSearch("idx")
9496
.setSize(0)
9597
.setQuery(QUERY)
96-
.addAggregation(dateHistogram("histo").field("date").dateHistogramInterval(DateHistogramInterval.DAY).minDocCount(0))
98+
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
9799
.get();
98100

99101
final Histogram allHisto = allResponse.getAggregations().get("histo");
@@ -104,4 +106,36 @@ public void testMinDocCountOnDateHistogram() throws Exception {
104106
assertEquals(entry.getValue(), results.get(entry.getKey()));
105107
}
106108
}
109+
110+
public void testDisableOptimizationGivesSameResults() throws Exception {
111+
SearchResponse response = client().prepareSearch("idx")
112+
.setSize(0)
113+
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
114+
.get();
115+
116+
final Histogram allHisto1 = response.getAggregations().get("histo");
117+
118+
final ClusterUpdateSettingsResponse updateSettingResponse = client().admin()
119+
.cluster()
120+
.prepareUpdateSettings()
121+
.setTransientSettings(Settings.builder().put(MAX_AGGREGATION_REWRITE_FILTERS.getKey(), 0))
122+
.get();
123+
124+
assertEquals(updateSettingResponse.getTransientSettings().get(MAX_AGGREGATION_REWRITE_FILTERS.getKey()), "0");
125+
126+
response = client().prepareSearch("idx")
127+
.setSize(0)
128+
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
129+
.get();
130+
131+
final Histogram allHisto2 = response.getAggregations().get("histo");
132+
133+
assertEquals(allHisto1, allHisto2);
134+
135+
client().admin()
136+
.cluster()
137+
.prepareUpdateSettings()
138+
.setTransientSettings(Settings.builder().putNull(MAX_AGGREGATION_REWRITE_FILTERS.getKey()))
139+
.get();
140+
}
107141
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -526,6 +526,7 @@ public void apply(Settings value, Settings current, Settings previous) {
526526
SearchService.MAX_OPEN_SCROLL_CONTEXT,
527527
SearchService.MAX_OPEN_PIT_CONTEXT,
528528
SearchService.MAX_PIT_KEEPALIVE_SETTING,
529+
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
529530
CreatePitController.PIT_INIT_KEEP_ALIVE,
530531
Node.WRITE_PORTS_FILE_SETTING,
531532
Node.NODE_NAME_SETTING,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

+15
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import java.util.function.LongSupplier;
108108

109109
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
110+
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
110111

111112
/**
112113
* The main search context used during search phase
@@ -187,6 +188,7 @@ final class DefaultSearchContext extends SearchContext {
187188
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
188189
private final boolean concurrentSearchSettingsEnabled;
189190
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
191+
private final int maxAggRewriteFilters;
190192

191193
DefaultSearchContext(
192194
ReaderContext readerContext,
@@ -240,6 +242,8 @@ final class DefaultSearchContext extends SearchContext {
240242
queryBoost = request.indexBoost();
241243
this.lowLevelCancellation = lowLevelCancellation;
242244
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
245+
246+
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
243247
}
244248

245249
@Override
@@ -995,4 +999,15 @@ public int getTargetMaxSliceCount() {
995999
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
9961000
}
9971001

1002+
@Override
1003+
public int maxAggRewriteFilters() {
1004+
return maxAggRewriteFilters;
1005+
}
1006+
1007+
private int evaluateFilterRewriteSetting() {
1008+
if (clusterService != null) {
1009+
return clusterService.getClusterSettings().get(MAX_AGGREGATION_REWRITE_FILTERS);
1010+
}
1011+
return 0;
1012+
}
9981013
}

server/src/main/java/org/opensearch/search/SearchService.java

+9
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
273273
Property.NodeScope
274274
);
275275

276+
// value 0 means rewrite filters optimization in aggregations will be disabled
277+
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
278+
"search.max_aggregation_rewrite_filters",
279+
72,
280+
0,
281+
Property.Dynamic,
282+
Property.NodeScope
283+
);
284+
276285
public static final int DEFAULT_SIZE = 10;
277286
public static final int DEFAULT_FROM = 0;
278287

server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ private FastFilterRewriteHelper() {}
6363

6464
private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class);
6565

66-
private static final int MAX_NUM_FILTER_BUCKETS = 1024;
6766
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;
6867

6968
// Initialize the wrapper map for unwrapping the query
@@ -186,8 +185,9 @@ private static Weight[] createFilterForAggregations(
186185
int bucketCount = 0;
187186
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
188187
bucketCount++;
189-
if (bucketCount > MAX_NUM_FILTER_BUCKETS) {
190-
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS);
188+
int maxNumFilterBuckets = context.maxAggRewriteFilters();
189+
if (bucketCount > maxNumFilterBuckets) {
190+
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets);
191191
return null;
192192
}
193193
// Below rounding is needed as the interval could return in
@@ -254,6 +254,8 @@ public void setAggregationType(AggregationType aggregationType) {
254254
}
255255

256256
public boolean isRewriteable(final Object parent, final int subAggLength) {
257+
if (context.maxAggRewriteFilters() == 0) return false;
258+
257259
boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength);
258260
logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId());
259261
this.rewriteable = rewriteable;

server/src/main/java/org/opensearch/search/internal/SearchContext.java

+4
Original file line numberDiff line numberDiff line change
@@ -522,4 +522,8 @@ public String toString() {
522522
public abstract boolean shouldUseTimeSeriesDescSortOptimization();
523523

524524
public abstract int getTargetMaxSliceCount();
525+
526+
public int maxAggRewriteFilters() {
527+
return 0;
528+
}
525529
}

0 commit comments

Comments
 (0)