Skip to content

Commit 65dba8c

Browse files
Gankris96akolarkunnu
authored andcommitted
Provide factory for pluggable deciders (opensearch-project#15713)
Signed-off-by: Ganesh Ramadurai <gramadur@icloud.com>
1 parent 919a41b commit 65dba8c

File tree

11 files changed

+136
-117
lines changed

11 files changed

+136
-117
lines changed

server/src/main/java/org/opensearch/node/Node.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@
235235
import org.opensearch.search.aggregations.support.AggregationUsageService;
236236
import org.opensearch.search.backpressure.SearchBackpressureService;
237237
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
238-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
238+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
239239
import org.opensearch.search.fetch.FetchPhase;
240240
import org.opensearch.search.pipeline.SearchPipelineService;
241241
import org.opensearch.search.query.QueryPhase;
@@ -1344,7 +1344,7 @@ protected Node(
13441344
circuitBreakerService,
13451345
searchModule.getIndexSearcherExecutor(threadPool),
13461346
taskResourceTrackingService,
1347-
searchModule.getConcurrentSearchDeciders()
1347+
searchModule.getConcurrentSearchRequestDeciderFactories()
13481348
);
13491349

13501350
final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
@@ -2004,7 +2004,7 @@ protected SearchService newSearchService(
20042004
CircuitBreakerService circuitBreakerService,
20052005
Executor indexSearcherExecutor,
20062006
TaskResourceTrackingService taskResourceTrackingService,
2007-
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
2007+
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
20082008
) {
20092009
return new SearchService(
20102010
clusterService,
@@ -2018,7 +2018,7 @@ protected SearchService newSearchService(
20182018
circuitBreakerService,
20192019
indexSearcherExecutor,
20202020
taskResourceTrackingService,
2021-
concurrentSearchDecidersList
2021+
concurrentSearchDeciderFactories
20222022
);
20232023
}
20242024

server/src/main/java/org/opensearch/plugins/SearchPlugin.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
6666
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
6767
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
68-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
68+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
6969
import org.opensearch.search.fetch.FetchSubPhase;
7070
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
7171
import org.opensearch.search.query.QueryPhaseSearcher;
@@ -141,12 +141,12 @@ default Map<String, Highlighter> getHighlighters() {
141141
}
142142

143143
/**
144-
* Allows plugins to register custom decider for concurrent search
145-
* @return A {@link ConcurrentSearchDecider}
144+
* Allows plugins to register a factory to create custom decider for concurrent search
145+
* @return A {@link ConcurrentSearchRequestDecider.Factory}
146146
*/
147147
@ExperimentalApi
148-
default ConcurrentSearchDecider getConcurrentSearchDecider() {
149-
return null;
148+
default Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
149+
return Optional.empty();
150150
}
151151

152152
/**

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@
7272
import org.opensearch.search.aggregations.SearchContextAggregations;
7373
import org.opensearch.search.builder.SearchSourceBuilder;
7474
import org.opensearch.search.collapse.CollapseContext;
75-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
7675
import org.opensearch.search.deciders.ConcurrentSearchDecision;
76+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
7777
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
7878
import org.opensearch.search.dfs.DfsSearchResult;
7979
import org.opensearch.search.fetch.FetchPhase;
@@ -106,13 +106,14 @@
106106
import java.util.Collection;
107107
import java.util.Collections;
108108
import java.util.HashMap;
109+
import java.util.HashSet;
109110
import java.util.List;
110111
import java.util.Map;
112+
import java.util.Optional;
111113
import java.util.Set;
112114
import java.util.concurrent.Executor;
113115
import java.util.function.Function;
114116
import java.util.function.LongSupplier;
115-
import java.util.stream.Collectors;
116117

117118
import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
118119
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
@@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext {
137138
private final ShardSearchRequest request;
138139
private final SearchShardTarget shardTarget;
139140
private final LongSupplier relativeTimeSupplier;
140-
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
141+
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
141142
private SearchType searchType;
142143
private final BigArrays bigArrays;
143144
private final IndexShard indexShard;
@@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext {
223224
boolean validate,
224225
Executor executor,
225226
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
226-
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
227+
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
227228
) throws IOException {
228229
this.readerContext = readerContext;
229230
this.request = request;
@@ -267,7 +268,7 @@ final class DefaultSearchContext extends SearchContext {
267268

268269
this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
269270
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
270-
this.concurrentSearchDeciders = concurrentSearchDeciders;
271+
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
271272
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
272273
}
273274

@@ -932,14 +933,21 @@ public boolean shouldUseConcurrentSearch() {
932933

933934
private boolean evaluateAutoMode() {
934935

935-
// filter out deciders that want to opt-out of decision-making
936-
final Set<ConcurrentSearchDecider> filteredDeciders = concurrentSearchDeciders.stream()
937-
.filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings()))
938-
.collect(Collectors.toSet());
936+
final Set<ConcurrentSearchRequestDecider> concurrentSearchRequestDeciders = new HashSet<>();
937+
938+
// create the ConcurrentSearchRequestDeciders using registered factories
939+
for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) {
940+
final Optional<ConcurrentSearchRequestDecider> concurrentSearchRequestDecider = deciderFactory.create(
941+
indexService.getIndexSettings()
942+
);
943+
concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add);
944+
945+
}
946+
939947
// evaluate based on concurrent search query visitor
940-
if (filteredDeciders.size() > 0) {
948+
if (concurrentSearchRequestDeciders.size() > 0) {
941949
ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor(
942-
filteredDeciders,
950+
concurrentSearchRequestDeciders,
943951
indexService.getIndexSettings()
944952
);
945953
if (request().source() != null && request().source().query() != null) {
@@ -949,7 +957,7 @@ private boolean evaluateAutoMode() {
949957
}
950958

951959
final List<ConcurrentSearchDecision> decisions = new ArrayList<>();
952-
for (ConcurrentSearchDecider decider : filteredDeciders) {
960+
for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) {
953961
ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision();
954962
if (decision != null) {
955963
if (logger.isDebugEnabled()) {

server/src/main/java/org/opensearch/search/SearchModule.java

+11-13
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@
239239
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
240240
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
241241
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
242-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
242+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
243243
import org.opensearch.search.fetch.FetchPhase;
244244
import org.opensearch.search.fetch.FetchSubPhase;
245245
import org.opensearch.search.fetch.subphase.ExplainPhase;
@@ -318,7 +318,7 @@ public class SearchModule {
318318
private final QueryPhaseSearcher queryPhaseSearcher;
319319
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;
320320

321-
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
321+
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
322322

323323
/**
324324
* Constructs a new SearchModule object
@@ -348,25 +348,23 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
348348
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
349349
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
350350
namedWriteables.addAll(SortValue.namedWriteables());
351-
concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins);
351+
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
352352
}
353353

354-
private Collection<ConcurrentSearchDecider> registerConcurrentSearchDeciders(List<SearchPlugin> plugins) {
355-
List<ConcurrentSearchDecider> concurrentSearchDeciders = new ArrayList<>();
354+
private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
355+
List<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories = new ArrayList<>();
356356
for (SearchPlugin plugin : plugins) {
357-
ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider();
358-
if (decider != null) {
359-
concurrentSearchDeciders.add(decider);
360-
}
357+
final Optional<ConcurrentSearchRequestDecider.Factory> deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory();
358+
deciderFactory.ifPresent(concurrentSearchDeciderFactories::add);
361359
}
362-
return concurrentSearchDeciders;
360+
return concurrentSearchDeciderFactories;
363361
}
364362

365363
/**
366-
* Returns the concurrent search deciders that the plugins have registered
364+
* Returns the concurrent search decider factories that the plugins have registered
367365
*/
368-
public Collection<ConcurrentSearchDecider> getConcurrentSearchDeciders() {
369-
return concurrentSearchDeciders;
366+
public Collection<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactories() {
367+
return concurrentSearchDeciderFactories;
370368
}
371369

372370
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {

server/src/main/java/org/opensearch/search/SearchService.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@
104104
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
105105
import org.opensearch.search.builder.SearchSourceBuilder;
106106
import org.opensearch.search.collapse.CollapseContext;
107-
import org.opensearch.search.deciders.ConcurrentSearchDecider;
107+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
108108
import org.opensearch.search.dfs.DfsPhase;
109109
import org.opensearch.search.dfs.DfsSearchResult;
110110
import org.opensearch.search.fetch.FetchPhase;
@@ -364,7 +364,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
364364
private final QueryPhase queryPhase;
365365

366366
private final FetchPhase fetchPhase;
367-
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
367+
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
368368

369369
private volatile long defaultKeepAlive;
370370

@@ -410,7 +410,7 @@ public SearchService(
410410
CircuitBreakerService circuitBreakerService,
411411
Executor indexSearcherExecutor,
412412
TaskResourceTrackingService taskResourceTrackingService,
413-
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
413+
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
414414
) {
415415
Settings settings = clusterService.getSettings();
416416
this.threadPool = threadPool;
@@ -466,7 +466,7 @@ public SearchService(
466466
allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings);
467467
clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField);
468468

469-
this.concurrentSearchDeciders = concurrentSearchDeciders;
469+
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
470470
}
471471

472472
private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
@@ -1167,7 +1167,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
11671167
validate,
11681168
indexSearcherExecutor,
11691169
this::aggReduceContextBuilder,
1170-
concurrentSearchDeciders
1170+
concurrentSearchDeciderFactories
11711171
);
11721172
// we clone the query shard context here just for rewriting otherwise we
11731173
// might end up with incorrect state since we are using now() or script services

server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import java.util.Collection;
1414

1515
/**
16-
* This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return.
16+
* This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return.
1717
*
1818
*/
1919
@ExperimentalApi

server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java

+22-14
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,21 @@
1212
import org.opensearch.index.IndexSettings;
1313
import org.opensearch.index.query.QueryBuilder;
1414

15+
import java.util.Optional;
16+
1517
/**
16-
* {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request
18+
* {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request
1719
* can use concurrent segment search using the passed in queryBuilders from query tree and index settings
1820
* on a per shard request basis.
19-
* Implementations can also opt out of the evaluation process for certain indices based on the index settings.
20-
* For all the deciders which can evaluate query tree for an index, its evaluateForQuery method
21-
* will be called for each node in the query tree. After traversing of the query tree is completed, the final
22-
* decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision}
21+
* Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider
22+
* This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the
23+
* concurrent search decision from the created decider on a per-request basis.
24+
* For all the deciders the evaluateForQuery method will be called for each node in the query tree.
25+
* After traversing of the query tree is completed, the final decision from the deciders will be
26+
* obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision}
2327
*/
2428
@ExperimentalApi
25-
public abstract class ConcurrentSearchDecider {
29+
public abstract class ConcurrentSearchRequestDecider {
2630

2731
/**
2832
* Evaluate for the passed in queryBuilder node in the query tree of the search request
@@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider {
3135
*/
3236
public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings);
3337

34-
/**
35-
* Provides a way for deciders to opt out of decision-making process for certain requests based on
36-
* index settings.
37-
* Return true if interested in decision making for index,
38-
* false, otherwise
39-
*/
40-
public abstract boolean canEvaluateForIndex(IndexSettings indexSettings);
41-
4238
/**
4339
* Provide the final decision for concurrent search based on all evaluations
4440
* Plugins may need to maintain internal state of evaluations to provide a final decision
@@ -47,4 +43,16 @@ public abstract class ConcurrentSearchDecider {
4743
*/
4844
public abstract ConcurrentSearchDecision getConcurrentSearchDecision();
4945

46+
/**
47+
* Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object.
48+
* Implementations can use the passed in indexSettings to decide whether to create the decider object or
49+
* return {@link Optional#empty()}.
50+
*/
51+
@ExperimentalApi
52+
public interface Factory {
53+
default Optional<ConcurrentSearchRequestDecider> create(IndexSettings indexSettings) {
54+
return Optional.empty();
55+
}
56+
}
57+
5058
}

server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919

2020
/**
2121
* Class to traverse the QueryBuilder tree and invoke the
22-
* {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree
22+
* {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree
2323
*/
2424
@ExperimentalApi
2525
public class ConcurrentSearchVisitor implements QueryBuilderVisitor {
2626

27-
private final Set<ConcurrentSearchDecider> deciders;
27+
private final Set<ConcurrentSearchRequestDecider> deciders;
2828
private final IndexSettings indexSettings;
2929

30-
public ConcurrentSearchVisitor(Set<ConcurrentSearchDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
30+
public ConcurrentSearchVisitor(Set<ConcurrentSearchRequestDecider> concurrentSearchVisitorDeciders, IndexSettings idxSettings) {
3131
Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null");
3232
deciders = concurrentSearchVisitorDeciders;
3333
indexSettings = idxSettings;

0 commit comments

Comments
 (0)