Skip to content

Commit 00b2fcc

Browse files
authored
Provide factory for pluggable deciders (#15729)
Signed-off-by: Ganesh Ramadurai <gramadur@icloud.com>
1 parent 0d032be commit 00b2fcc

File tree

11 files changed

+136
-117
lines changed

11 files changed

+136
-117
lines changed

server/src/main/java/org/opensearch/node/Node.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@
233233
import org.opensearch.search.aggregations.support.AggregationUsageService;
234234
import org.opensearch.search.backpressure.SearchBackpressureService;
235235
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
236-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
236+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
237237
import org.opensearch.search.fetch.FetchPhase;
238238
import org.opensearch.search.pipeline.SearchPipelineService;
239239
import org.opensearch.search.query.QueryPhase;
@@ -1336,7 +1336,7 @@ protected Node(
13361336
circuitBreakerService,
13371337
searchModule.getIndexSearcherExecutor(threadPool),
13381338
taskResourceTrackingService,
1339-
searchModule.getConcurrentSearchDeciders()
1339+
searchModule.getConcurrentSearchRequestDeciderFactories()
13401340
);
13411341

13421342
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
@@ -1986,7 +1986,7 @@ protected SearchService newSearchService(
19861986
CircuitBreakerService circuitBreakerService,
19871987
Executor indexSearcherExecutor,
19881988
TaskResourceTrackingService taskResourceTrackingService,
1989-
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
1989+
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
19901990
) {
19911991
return new SearchService(
19921992
clusterService,
@@ -2000,7 +2000,7 @@ protected SearchService newSearchService(
20002000
circuitBreakerService,
20012001
indexSearcherExecutor,
20022002
taskResourceTrackingService,
2003-
concurrentSearchDecidersList
2003+
concurrentSearchDeciderFactories
20042004
);
20052005
}
20062006

server/src/main/java/org/opensearch/plugins/SearchPlugin.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
6666
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
6767
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
68-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
68+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
6969
import org.opensearch.search.fetch.FetchSubPhase;
7070
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
7171
import org.opensearch.search.query.QueryPhaseSearcher;
@@ -141,12 +141,12 @@ default Map<String, Highlighter> getHighlighters() {
141141
}
142142

143143
/**
144-
* Allows plugins to register custom decider for concurrent search
145-
* @return A {@link ConcurrentSearchDecider}
144+
* Allows plugins to register a factory to create custom decider for concurrent search
145+
* @return A {@link ConcurrentSearchRequestDecider.Factory}
146146
*/
147147
@ExperimentalApi
148-
default ConcurrentSearchDecider getConcurrentSearchDecider() {
149-
return null;
148+
default Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
149+
return Optional.empty();
150150
}
151151

152152
/**

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@
7272
import org.opensearch.search.aggregations.SearchContextAggregations;
7373
import org.opensearch.search.builder.SearchSourceBuilder;
7474
import org.opensearch.search.collapse.CollapseContext;
75-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
7675
import org.opensearch.search.deciders.ConcurrentSearchDecision;
76+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
7777
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
7878
import org.opensearch.search.dfs.DfsSearchResult;
7979
import org.opensearch.search.fetch.FetchPhase;
@@ -106,13 +106,14 @@
106106
import java.util.Collection;
107107
import java.util.Collections;
108108
import java.util.HashMap;
109+
import java.util.HashSet;
109110
import java.util.List;
110111
import java.util.Map;
112+
import java.util.Optional;
111113
import java.util.Set;
112114
import java.util.concurrent.Executor;
113115
import java.util.function.Function;
114116
import java.util.function.LongSupplier;
115-
import java.util.stream.Collectors;
116117

117118
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
118119
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
@@ -136,7 +137,7 @@ final class DefaultSearchContext extends SearchContext {
136137
private final ShardSearchRequest request;
137138
private final SearchShardTarget shardTarget;
138139
private final LongSupplier relativeTimeSupplier;
139-
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
140+
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
140141
private SearchType searchType;
141142
private final BigArrays bigArrays;
142143
private final IndexShard indexShard;
@@ -221,7 +222,7 @@ final class DefaultSearchContext extends SearchContext {
221222
boolean validate,
222223
Executor executor,
223224
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
224-
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
225+
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
225226
) throws IOException {
226227
this.readerContext = readerContext;
227228
this.request = request;
@@ -264,7 +265,7 @@ final class DefaultSearchContext extends SearchContext {
264265

265266
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
266267
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
267-
this.concurrentSearchDeciders = concurrentSearchDeciders;
268+
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
268269
}
269270

270271
@Override
@@ -928,14 +929,21 @@ public boolean shouldUseConcurrentSearch() {
928929

929930
private boolean evaluateAutoMode() {
930931

931-
// filter out deciders that want to opt-out of decision-making
932-
final Set<ConcurrentSearchDecider> filteredDeciders = concurrentSearchDeciders.stream()
933-
.filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings()))
934-
.collect(Collectors.toSet());
932+
final Set<ConcurrentSearchRequestDecider> concurrentSearchRequestDeciders = new HashSet<>();
933+
934+
// create the ConcurrentSearchRequestDeciders using registered factories
935+
for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) {
936+
final Optional<ConcurrentSearchRequestDecider> concurrentSearchRequestDecider = deciderFactory.create(
937+
indexService.getIndexSettings()
938+
);
939+
concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add);
940+
941+
}
942+
935943
// evaluate based on concurrent search query visitor
936-
if (filteredDeciders.size() > 0) {
944+
if (concurrentSearchRequestDeciders.size() > 0) {
937945
ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor(
938-
filteredDeciders,
946+
concurrentSearchRequestDeciders,
939947
indexService.getIndexSettings()
940948
);
941949
if (request().source() != null && request().source().query() != null) {
@@ -945,7 +953,7 @@ private boolean evaluateAutoMode() {
945953
}
946954

947955
final List<ConcurrentSearchDecision> decisions = new ArrayList<>();
948-
for (ConcurrentSearchDecider decider : filteredDeciders) {
956+
for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) {
949957
ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision();
950958
if (decision != null) {
951959
if (logger.isDebugEnabled()) {

server/src/main/java/org/opensearch/search/SearchModule.java

+11-13
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@
255255
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
256256
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
257257
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
258-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
258+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
259259
import org.opensearch.search.fetch.FetchPhase;
260260
import org.opensearch.search.fetch.FetchSubPhase;
261261
import org.opensearch.search.fetch.subphase.ExplainPhase;
@@ -334,7 +334,7 @@ public class SearchModule {
334334
private final QueryPhaseSearcher queryPhaseSearcher;
335335
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;
336336

337-
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
337+
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
338338

339339
/**
340340
* Constructs a new SearchModule object
@@ -364,25 +364,23 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
364364
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
365365
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
366366
namedWriteables.addAll(SortValue.namedWriteables());
367-
concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins);
367+
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
368368
}
369369

370-
private Collection<ConcurrentSearchDecider> registerConcurrentSearchDeciders(List<SearchPlugin> plugins) {
371-
List<ConcurrentSearchDecider> concurrentSearchDeciders = new ArrayList<>();
370+
private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
371+
List<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories = new ArrayList<>();
372372
for (SearchPlugin plugin : plugins) {
373-
ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider();
374-
if (decider != null) {
375-
concurrentSearchDeciders.add(decider);
376-
}
373+
final Optional<ConcurrentSearchRequestDecider.Factory> deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory();
374+
deciderFactory.ifPresent(concurrentSearchDeciderFactories::add);
377375
}
378-
return concurrentSearchDeciders;
376+
return concurrentSearchDeciderFactories;
379377
}
380378

381379
/**
382-
* Returns the concurrent search deciders that the plugins have registered
380+
* Returns the concurrent search decider factories that the plugins have registered
383381
*/
384-
public Collection<ConcurrentSearchDecider> getConcurrentSearchDeciders() {
385-
return concurrentSearchDeciders;
382+
public Collection<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactories() {
383+
return concurrentSearchDeciderFactories;
386384
}
387385

388386
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {

server/src/main/java/org/opensearch/search/SearchService.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
106106
import org.opensearch.search.builder.SearchSourceBuilder;
107107
import org.opensearch.search.collapse.CollapseContext;
108-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
108+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
109109
import org.opensearch.search.dfs.DfsPhase;
110110
import org.opensearch.search.dfs.DfsSearchResult;
111111
import org.opensearch.search.fetch.FetchPhase;
@@ -358,7 +358,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
358358
private final QueryPhase queryPhase;
359359

360360
private final FetchPhase fetchPhase;
361-
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
361+
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
362362

363363
private volatile long defaultKeepAlive;
364364

@@ -404,7 +404,7 @@ public SearchService(
404404
CircuitBreakerService circuitBreakerService,
405405
Executor indexSearcherExecutor,
406406
TaskResourceTrackingService taskResourceTrackingService,
407-
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
407+
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
408408
) {
409409
Settings settings = clusterService.getSettings();
410410
this.threadPool = threadPool;
@@ -460,7 +460,7 @@ public SearchService(
460460
allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings);
461461
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField);
462462

463-
this.concurrentSearchDeciders = concurrentSearchDeciders;
463+
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
464464
}
465465

466466
private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
@@ -1161,7 +1161,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
11611161
validate,
11621162
indexSearcherExecutor,
11631163
this::aggReduceContextBuilder,
1164-
concurrentSearchDeciders
1164+
concurrentSearchDeciderFactories
11651165
);
11661166
// we clone the query shard context here just for rewriting otherwise we
11671167
// might end up with incorrect state since we are using now() or script services

server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import java.util.Collection;
1414

1515
/**
16-
* This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return.
16+
* This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return.
1717
*
1818
*/
1919
@ExperimentalApi

server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java

+22-14
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,21 @@
1212
import org.opensearch.index.IndexSettings;
1313
import org.opensearch.index.query.QueryBuilder;
1414

15+
import java.util.Optional;
16+
1517
/**
16-
* {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request
18+
* {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request
1719
* can use concurrent segment search using the passed in queryBuilders from query tree and index settings
1820
* on a per shard request basis.
19-
* Implementations can also opt out of the evaluation process for certain indices based on the index settings.
20-
* For all the deciders which can evaluate query tree for an index, its evaluateForQuery method
21-
* will be called for each node in the query tree. After traversing of the query tree is completed, the final
22-
* decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision}
21+
* Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider
22+
* This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the
23+
* concurrent search decision from the created decider on a per-request basis.
24+
* For all the deciders the evaluateForQuery method will be called for each node in the query tree.
25+
* After traversing of the query tree is completed, the final decision from the deciders will be
26+
* obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision}
2327
*/
2428
@ExperimentalApi
25-
public abstract class ConcurrentSearchDecider {
29+
public abstract class ConcurrentSearchRequestDecider {
2630

2731
/**
2832
* Evaluate for the passed in queryBuilder node in the query tree of the search request
@@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider {
3135
*/
3236
public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings);
3337

34-
/**
35-
* Provides a way for deciders to opt out of decision-making process for certain requests based on
36-
* index settings.
37-
* Return true if interested in decision making for index,
38-
* false, otherwise
39-
*/
40-
public abstract boolean canEvaluateForIndex(IndexSettings indexSettings);
41-
4238
/**
4339
* Provide the final decision for concurrent search based on all evaluations
4440
* Plugins may need to maintain internal state of evaluations to provide a final decision
@@ -47,4 +43,16 @@ public abstract class ConcurrentSearchDecider {
4743
*/
4844
public abstract ConcurrentSearchDecision getConcurrentSearchDecision();
4945

46+
/**
47+
* Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object.
48+
* Implementations can use the passed in indexSettings to decide whether to create the decider object or
49+
* return {@link Optional#empty()}.
50+
*/
51+
@ExperimentalApi
52+
public interface Factory {
53+
default Optional<ConcurrentSearchRequestDecider> create(IndexSettings indexSettings) {
54+
return Optional.empty();
55+
}
56+
}
57+
5058
}

server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919

2020
/**
2121
* Class to traverse the QueryBuilder tree and invoke the
22-
* {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree
22+
* {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree
2323
*/
2424
@ExperimentalApi
2525
public class ConcurrentSearchVisitor implements QueryBuilderVisitor {
2626

27-
private final Set<ConcurrentSearchDecider> deciders;
27+
private final Set<ConcurrentSearchRequestDecider> deciders;
2828
private final IndexSettings indexSettings;
2929

30-
public ConcurrentSearchVisitor(Set<ConcurrentSearchDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
30+
public ConcurrentSearchVisitor(Set<ConcurrentSearchRequestDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
3131
Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null");
3232
deciders = concurrentSearchVisitorDeciders;
3333
indexSettings = idxSettings;

0 commit comments

Comments
 (0)