Skip to content

Commit cadf974

Browse files
committed
Add cluster setting to dynamically configure filter rewrite optimization (opensearch-project#13179)
--------- Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent d91edf0 commit cadf974

File tree

7 files changed

+90
-6
lines changed

7 files changed

+90
-6
lines changed

CHANGELOG.md

+18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,24 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.12.x]
77
### Added
8+
- Constant Keyword Field ([#12285](https://github.com/opensearch-project/OpenSearch/pull/12285))
9+
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
10+
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
11+
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
12+
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
13+
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
14+
- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063))
15+
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))
16+
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
17+
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
18+
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
19+
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
20+
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
21+
- [Tiered Caching] Add dimension-based stats to ICache implementations. ([#12531](https://github.com/opensearch-project/OpenSearch/pull/12531))
22+
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
23+
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
24+
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
25+
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
826

927
### Dependencies
1028

server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/FilterRewriteIT.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
1212

13+
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1314
import org.opensearch.action.index.IndexRequestBuilder;
1415
import org.opensearch.action.search.SearchResponse;
1516
import org.opensearch.common.settings.Settings;
@@ -34,6 +35,7 @@
3435

3536
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
3637
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
38+
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
3739
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
3840
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
3941

@@ -93,7 +95,7 @@ public void testMinDocCountOnDateHistogram() throws Exception {
9395
final SearchResponse allResponse = client().prepareSearch("idx")
9496
.setSize(0)
9597
.setQuery(QUERY)
96-
.addAggregation(dateHistogram("histo").field("date").dateHistogramInterval(DateHistogramInterval.DAY).minDocCount(0))
98+
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
9799
.get();
98100

99101
final Histogram allHisto = allResponse.getAggregations().get("histo");
@@ -104,4 +106,36 @@ public void testMinDocCountOnDateHistogram() throws Exception {
104106
assertEquals(entry.getValue(), results.get(entry.getKey()));
105107
}
106108
}
109+
110+
public void testDisableOptimizationGivesSameResults() throws Exception {
111+
SearchResponse response = client().prepareSearch("idx")
112+
.setSize(0)
113+
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
114+
.get();
115+
116+
final Histogram allHisto1 = response.getAggregations().get("histo");
117+
118+
final ClusterUpdateSettingsResponse updateSettingResponse = client().admin()
119+
.cluster()
120+
.prepareUpdateSettings()
121+
.setTransientSettings(Settings.builder().put(MAX_AGGREGATION_REWRITE_FILTERS.getKey(), 0))
122+
.get();
123+
124+
assertEquals(updateSettingResponse.getTransientSettings().get(MAX_AGGREGATION_REWRITE_FILTERS.getKey()), "0");
125+
126+
response = client().prepareSearch("idx")
127+
.setSize(0)
128+
.addAggregation(dateHistogram("histo").field("date").calendarInterval(DateHistogramInterval.DAY).minDocCount(0))
129+
.get();
130+
131+
final Histogram allHisto2 = response.getAggregations().get("histo");
132+
133+
assertEquals(allHisto1, allHisto2);
134+
135+
client().admin()
136+
.cluster()
137+
.prepareUpdateSettings()
138+
.setTransientSettings(Settings.builder().putNull(MAX_AGGREGATION_REWRITE_FILTERS.getKey()))
139+
.get();
140+
}
107141
}

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

+1
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ public void apply(Settings value, Settings current, Settings previous) {
516516
SearchService.MAX_OPEN_SCROLL_CONTEXT,
517517
SearchService.MAX_OPEN_PIT_CONTEXT,
518518
SearchService.MAX_PIT_KEEPALIVE_SETTING,
519+
SearchService.MAX_AGGREGATION_REWRITE_FILTERS,
519520
CreatePitController.PIT_INIT_KEEP_ALIVE,
520521
Node.WRITE_PORTS_FILE_SETTING,
521522
Node.NODE_NAME_SETTING,

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

+15
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@
107107
import java.util.function.LongSupplier;
108108

109109
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
110+
import static org.opensearch.search.SearchService.MAX_AGGREGATION_REWRITE_FILTERS;
110111

111112
/**
112113
* The main search context used during search phase
@@ -185,6 +186,7 @@ final class DefaultSearchContext extends SearchContext {
185186
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
186187
private final boolean concurrentSearchSettingsEnabled;
187188
private final SetOnce<Boolean> requestShouldUseConcurrentSearch = new SetOnce<>();
189+
private final int maxAggRewriteFilters;
188190

189191
DefaultSearchContext(
190192
ReaderContext readerContext,
@@ -238,6 +240,8 @@ final class DefaultSearchContext extends SearchContext {
238240
queryBoost = request.indexBoost();
239241
this.lowLevelCancellation = lowLevelCancellation;
240242
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
243+
244+
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
241245
}
242246

243247
@Override
@@ -977,4 +981,15 @@ public int getTargetMaxSliceCount() {
977981
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
978982
}
979983

984+
@Override
985+
public int maxAggRewriteFilters() {
986+
return maxAggRewriteFilters;
987+
}
988+
989+
private int evaluateFilterRewriteSetting() {
990+
if (clusterService != null) {
991+
return clusterService.getClusterSettings().get(MAX_AGGREGATION_REWRITE_FILTERS);
992+
}
993+
return 0;
994+
}
980995
}

server/src/main/java/org/opensearch/search/SearchService.java

+9
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
270270
Property.NodeScope
271271
);
272272

273+
// value 0 means rewrite filters optimization in aggregations will be disabled
274+
public static final Setting<Integer> MAX_AGGREGATION_REWRITE_FILTERS = Setting.intSetting(
275+
"search.max_aggregation_rewrite_filters",
276+
72,
277+
0,
278+
Property.Dynamic,
279+
Property.NodeScope
280+
);
281+
273282
public static final int DEFAULT_SIZE = 10;
274283
public static final int DEFAULT_FROM = 0;
275284

server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ private FastFilterRewriteHelper() {}
6868

6969
private static final Logger logger = LogManager.getLogger(FastFilterRewriteHelper.class);
7070

71-
private static final int MAX_NUM_FILTER_BUCKETS = 1024;
7271
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;
7372

7473
// Initialize the wrapper map for unwrapping the query
@@ -191,8 +190,9 @@ private static Weight[] createFilterForAggregations(
191190
int bucketCount = 0;
192191
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
193192
bucketCount++;
194-
if (bucketCount > MAX_NUM_FILTER_BUCKETS) {
195-
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS);
193+
int maxNumFilterBuckets = context.maxAggRewriteFilters();
194+
if (bucketCount > maxNumFilterBuckets) {
195+
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets);
196196
return null;
197197
}
198198
// Below rounding is needed as the interval could return in
@@ -272,6 +272,8 @@ public void setAggregationType(AggregationType aggregationType) {
272272
}
273273

274274
public boolean isRewriteable(final Object parent, final int subAggLength) {
275+
if (context.maxAggRewriteFilters() == 0) return false;
276+
275277
boolean rewriteable = aggregationType.isRewriteable(parent, subAggLength);
276278
logger.debug("Fast filter rewriteable: {} for shard {}", rewriteable, context.indexShard().shardId());
277279
this.rewriteable = rewriteable;
@@ -588,8 +590,9 @@ private static long[][] createRangesFromAgg(
588590
int bucketCount = 0;
589591
while (roundedLow <= fieldType.convertNanosToMillis(high)) {
590592
bucketCount++;
591-
if (bucketCount > MAX_NUM_FILTER_BUCKETS) {
592-
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", MAX_NUM_FILTER_BUCKETS);
593+
int maxNumFilterBuckets = context.maxAggRewriteFilters();
594+
if (bucketCount > maxNumFilterBuckets) {
595+
logger.debug("Max number of filters reached [{}], skip the fast filter optimization", maxNumFilterBuckets);
593596
return null;
594597
}
595598
// Below rounding is needed as the interval could return in

server/src/main/java/org/opensearch/search/internal/SearchContext.java

+4
Original file line numberDiff line numberDiff line change
@@ -489,4 +489,8 @@ public String toString() {
489489
public abstract boolean shouldUseTimeSeriesDescSortOptimization();
490490

491491
public abstract int getTargetMaxSliceCount();
492+
493+
public int maxAggRewriteFilters() {
494+
return 0;
495+
}
492496
}

0 commit comments

Comments
 (0)