Skip to content

Commit 68450d8

Browse files
jed326Jay Deng
authored and
Jay Deng
committed
Disable concurrent segment search for system indices and throttled search requests
Signed-off-by: Jay Deng <jayd0104@gmail.com>
1 parent 6ddbdcd commit 68450d8

File tree

3 files changed

+79
-2
lines changed

3 files changed

+79
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
107107
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
108108
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
109109
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
110+
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
110111

111112
### Dependencies
112113
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,11 @@ public boolean shouldUseConcurrentSearch() {
902902
* Evaluate if parsed request supports concurrent segment search
903903
*/
904904
public void evaluateRequestShouldUseConcurrentSearch() {
905-
if (sort != null && sort.isSortOnTimeSeriesField()) {
905+
// Do not use concurrent segment search for system indices or throttled requests. See:
906+
// https://github.com/opensearch-project/OpenSearch/issues/12951
907+
if (indexShard.isSystem() || indexShard.indexSettings().isSearchThrottled()) {
908+
requestShouldUseConcurrentSearch.set(false);
909+
} else if (sort != null && sort.isSortOnTimeSeriesField()) {
906910
requestShouldUseConcurrentSearch.set(false);
907911
} else if (aggregations() != null
908912
&& aggregations().factories() != null

server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java

+73-1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@
9393
import java.util.function.Function;
9494
import java.util.function.Supplier;
9595

96+
import static org.opensearch.index.IndexSettings.INDEX_SEARCH_THROTTLED;
9697
import static org.hamcrest.Matchers.equalTo;
9798
import static org.hamcrest.Matchers.is;
9899
import static org.mockito.Mockito.any;
@@ -551,7 +552,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
551552
}
552553
}
553554

554-
public void testSearchPathEvaluationUsingSortField() throws Exception {
555+
public void testSearchPathEvaluation() throws Exception {
555556
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
556557
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
557558
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
@@ -578,9 +579,24 @@ public void testSearchPathEvaluationUsingSortField() throws Exception {
578579
IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
579580
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
580581
when(indexService.getIndexSettings()).thenReturn(indexSettings);
582+
when(indexShard.indexSettings()).thenReturn(indexSettings);
581583

582584
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
583585

586+
IndexShard systemIndexShard = mock(IndexShard.class);
587+
when(systemIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
588+
when(systemIndexShard.getThreadPool()).thenReturn(threadPool);
589+
when(systemIndexShard.isSystem()).thenReturn(true);
590+
591+
IndexShard throttledIndexShard = mock(IndexShard.class);
592+
when(throttledIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
593+
when(throttledIndexShard.getThreadPool()).thenReturn(threadPool);
594+
IndexSettings throttledIndexSettings = new IndexSettings(
595+
indexMetadata,
596+
Settings.builder().put(INDEX_SEARCH_THROTTLED.getKey(), true).build()
597+
);
598+
when(throttledIndexShard.indexSettings()).thenReturn(throttledIndexSettings);
599+
584600
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
585601

586602
final Supplier<Engine.SearcherSupplier> searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) {
@@ -697,6 +713,62 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
697713
}
698714
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);
699715

716+
// Case 4: With a system index concurrent segment search is not used
717+
readerContext = new ReaderContext(
718+
newContextId(),
719+
indexService,
720+
systemIndexShard,
721+
searcherSupplier.get(),
722+
randomNonNegativeLong(),
723+
false
724+
);
725+
context = new DefaultSearchContext(
726+
readerContext,
727+
shardSearchRequest,
728+
target,
729+
null,
730+
bigArrays,
731+
null,
732+
null,
733+
null,
734+
false,
735+
Version.CURRENT,
736+
false,
737+
executor,
738+
null
739+
);
740+
context.evaluateRequestShouldUseConcurrentSearch();
741+
assertFalse(context.shouldUseConcurrentSearch());
742+
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);
743+
744+
// Case 5: When search is throttled concurrent segment search is not used
745+
readerContext = new ReaderContext(
746+
newContextId(),
747+
indexService,
748+
throttledIndexShard,
749+
searcherSupplier.get(),
750+
randomNonNegativeLong(),
751+
false
752+
);
753+
context = new DefaultSearchContext(
754+
readerContext,
755+
shardSearchRequest,
756+
target,
757+
null,
758+
bigArrays,
759+
null,
760+
null,
761+
null,
762+
false,
763+
Version.CURRENT,
764+
false,
765+
executor,
766+
null
767+
);
768+
context.evaluateRequestShouldUseConcurrentSearch();
769+
assertFalse(context.shouldUseConcurrentSearch());
770+
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);
771+
700772
// shutdown the threadpool
701773
threadPool.shutdown();
702774
}

0 commit comments

Comments
 (0)