Skip to content

Commit cb2b7a4

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Disable concurrent segment search for system indices and throttled search requests (opensearch-project#12954)
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 48881de commit cb2b7a4

File tree

3 files changed

+90
-3
lines changed

3 files changed

+90
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
1010
- Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818))
1111
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
12+
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
1213

1314
### Dependencies
1415
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -962,16 +962,21 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
962962
* false: otherwise
963963
*/
964964
private boolean evaluateConcurrentSegmentSearchSettings(Executor concurrentSearchExecutor) {
965+
// Do not use concurrent segment search for system indices or throttled requests. See:
966+
// https://github.com/opensearch-project/OpenSearch/issues/12951
967+
if (indexShard.isSystem() || indexShard.indexSettings().isSearchThrottled()) {
968+
return false;
969+
}
970+
965971
if ((clusterService != null) && (concurrentSearchExecutor != null)) {
966972
return indexService.getIndexSettings()
967973
.getSettings()
968974
.getAsBoolean(
969975
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
970976
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
971977
);
972-
} else {
973-
return false;
974978
}
979+
return false;
975980
}
976981

977982
@Override

server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java

+82-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import java.util.function.Function;
9494
import java.util.function.Supplier;
9595

96+
import static org.opensearch.index.IndexSettings.INDEX_SEARCH_THROTTLED;
9697
import static org.hamcrest.Matchers.equalTo;
9798
import static org.hamcrest.Matchers.is;
9899
import static org.mockito.Mockito.any;
@@ -168,6 +169,7 @@ public void testPreProcess() throws Exception {
168169
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
169170
when(indexService.getIndexSettings()).thenReturn(indexSettings);
170171
when(mapperService.getIndexSettings()).thenReturn(indexSettings);
172+
when(indexShard.indexSettings()).thenReturn(indexSettings);
171173

172174
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
173175

@@ -486,6 +488,14 @@ public void testClearQueryCancellationsOnClose() throws IOException {
486488
when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn(
487489
queryShardContext
488490
);
491+
Settings settings = Settings.builder()
492+
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
493+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
494+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
495+
.build();
496+
IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
497+
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
498+
when(indexShard.indexSettings()).thenReturn(indexSettings);
489499

490500
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
491501

@@ -551,7 +561,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
551561
}
552562
}
553563

554-
public void testSearchPathEvaluationUsingSortField() throws Exception {
564+
public void testSearchPathEvaluation() throws Exception {
555565
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
556566
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
557567
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
@@ -578,9 +588,24 @@ public void testSearchPathEvaluationUsingSortField() throws Exception {
578588
IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
579589
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
580590
when(indexService.getIndexSettings()).thenReturn(indexSettings);
591+
when(indexShard.indexSettings()).thenReturn(indexSettings);
581592

582593
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
583594

595+
IndexShard systemIndexShard = mock(IndexShard.class);
596+
when(systemIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
597+
when(systemIndexShard.getThreadPool()).thenReturn(threadPool);
598+
when(systemIndexShard.isSystem()).thenReturn(true);
599+
600+
IndexShard throttledIndexShard = mock(IndexShard.class);
601+
when(throttledIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
602+
when(throttledIndexShard.getThreadPool()).thenReturn(threadPool);
603+
IndexSettings throttledIndexSettings = new IndexSettings(
604+
indexMetadata,
605+
Settings.builder().put(INDEX_SEARCH_THROTTLED.getKey(), true).build()
606+
);
607+
when(throttledIndexShard.indexSettings()).thenReturn(throttledIndexSettings);
608+
584609
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
585610

586611
final Supplier<Engine.SearcherSupplier> searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) {
@@ -697,6 +722,62 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
697722
}
698723
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);
699724

725+
// Case 4: With a system index concurrent segment search is not used
726+
readerContext = new ReaderContext(
727+
newContextId(),
728+
indexService,
729+
systemIndexShard,
730+
searcherSupplier.get(),
731+
randomNonNegativeLong(),
732+
false
733+
);
734+
context = new DefaultSearchContext(
735+
readerContext,
736+
shardSearchRequest,
737+
target,
738+
null,
739+
bigArrays,
740+
null,
741+
null,
742+
null,
743+
false,
744+
Version.CURRENT,
745+
false,
746+
executor,
747+
null
748+
);
749+
context.evaluateRequestShouldUseConcurrentSearch();
750+
assertFalse(context.shouldUseConcurrentSearch());
751+
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);
752+
753+
// Case 5: When search is throttled concurrent segment search is not used
754+
readerContext = new ReaderContext(
755+
newContextId(),
756+
indexService,
757+
throttledIndexShard,
758+
searcherSupplier.get(),
759+
randomNonNegativeLong(),
760+
false
761+
);
762+
context = new DefaultSearchContext(
763+
readerContext,
764+
shardSearchRequest,
765+
target,
766+
null,
767+
bigArrays,
768+
null,
769+
null,
770+
null,
771+
false,
772+
Version.CURRENT,
773+
false,
774+
executor,
775+
null
776+
);
777+
context.evaluateRequestShouldUseConcurrentSearch();
778+
assertFalse(context.shouldUseConcurrentSearch());
779+
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);
780+
700781
// shutdown the threadpool
701782
threadPool.shutdown();
702783
}

0 commit comments

Comments
 (0)