Skip to content

Commit cc61f2a

Browse files
peteralfonsiPeter Alfonsi
authored andcommitted
[Bugfix] Fixes IRC NPE bug for timed-out cacheable queries (opensearch-project#15327)
* Fix IRC timeout bug Signed-off-by: Peter Alfonsi <petealft@amazon.com> * addressed Sagar's comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * addressed Ankit's comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Add UT for test coverage Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * tweak imports in new UT Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> --------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com>
1 parent 21be0fc commit cc61f2a

File tree

4 files changed

+105
-8
lines changed

4 files changed

+105
-8
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java

+63
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@
3434

3535
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
3636

37+
import org.apache.lucene.index.Term;
38+
import org.apache.lucene.search.IndexSearcher;
39+
import org.apache.lucene.search.Query;
40+
import org.apache.lucene.search.ScoreMode;
41+
import org.apache.lucene.search.TermQuery;
42+
import org.apache.lucene.search.Weight;
3743
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
3844
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
3945
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
@@ -56,7 +62,10 @@
5662
import org.opensearch.env.NodeEnvironment;
5763
import org.opensearch.index.IndexSettings;
5864
import org.opensearch.index.cache.request.RequestCacheStats;
65+
import org.opensearch.index.query.QueryBuilder;
5966
import org.opensearch.index.query.QueryBuilders;
67+
import org.opensearch.index.query.QueryShardContext;
68+
import org.opensearch.index.query.TermQueryBuilder;
6069
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
6170
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
6271
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
@@ -65,6 +74,7 @@
6574
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
6675
import org.opensearch.test.hamcrest.OpenSearchAssertions;
6776

77+
import java.io.IOException;
6878
import java.nio.file.Files;
6979
import java.nio.file.Path;
7080
import java.time.ZoneId;
@@ -768,6 +778,59 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception {
768778
assertTrue(stats.getMemorySizeInBytes() == 0);
769779
}
770780

781+
public void testTimedOutQuery() throws Exception {
782+
// A timed out query should be cached and then invalidated
783+
Client client = client();
784+
String index = "index";
785+
assertAcked(
786+
client.admin()
787+
.indices()
788+
.prepareCreate(index)
789+
.setMapping("k", "type=keyword")
790+
.setSettings(
791+
Settings.builder()
792+
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
793+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
794+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
795+
// Disable index refreshing to avoid cache being invalidated mid-test
796+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1))
797+
)
798+
.get()
799+
);
800+
indexRandom(true, client.prepareIndex(index).setSource("k", "hello"));
801+
ensureSearchable(index);
802+
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
803+
forceMerge(client, index);
804+
805+
QueryBuilder timeoutQueryBuilder = new TermQueryBuilder("k", "hello") {
806+
@Override
807+
protected Query doToQuery(QueryShardContext context) {
808+
return new TermQuery(new Term("k", "hello")) {
809+
@Override
810+
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
811+
// Create the weight before sleeping. Otherwise, TermStates.build() (in the call to super.createWeight()) will
812+
// sometimes throw an exception on timeout, rather than timing out gracefully.
813+
Weight result = super.createWeight(searcher, scoreMode, boost);
814+
try {
815+
Thread.sleep(500);
816+
} catch (InterruptedException ignored) {}
817+
return result;
818+
}
819+
};
820+
}
821+
};
822+
823+
SearchResponse resp = client.prepareSearch(index)
824+
.setRequestCache(true)
825+
.setQuery(timeoutQueryBuilder)
826+
.setTimeout(TimeValue.ZERO)
827+
.get();
828+
assertTrue(resp.isTimedOut());
829+
RequestCacheStats requestCacheStats = getRequestCacheStats(client, index);
830+
// The cache should be empty as the timed-out query was invalidated
831+
assertEquals(0, requestCacheStats.getMemorySizeInBytes());
832+
}
833+
771834
private Path[] shardDirectory(String server, Index index, int shard) {
772835
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
773836
final Path[] paths = env.availableShardPaths(new ShardId(index, shard));

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -310,12 +310,11 @@ BytesReference getOrCompute(
310310
* @param cacheKey the cache key to invalidate
311311
*/
312312
void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) {
313-
assert reader.getReaderCacheHelper() != null;
314-
String readerCacheKeyId = null;
315-
if (reader instanceof OpenSearchDirectoryReader) {
316-
IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper();
317-
readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId();
318-
}
313+
assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper;
314+
OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader
315+
.getReaderCacheHelper();
316+
String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId();
317+
319318
IndexShard indexShard = (IndexShard) cacheEntity.getCacheIdentity();
320319
cache.invalidate(getICacheKey(new Key(indexShard.shardId(), cacheKey, readerCacheKeyId, System.identityHashCode(indexShard))));
321320
}

server/src/main/java/org/opensearch/indices/IndicesService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.opensearch.common.io.stream.BytesStreamOutput;
6969
import org.opensearch.common.lease.Releasable;
7070
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
71+
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper;
7172
import org.opensearch.common.settings.IndexScopedSettings;
7273
import org.opensearch.common.settings.Setting;
7374
import org.opensearch.common.settings.Setting.Property;
@@ -1754,8 +1755,7 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
17541755
if (context.getQueryShardContext().isCacheable() == false) {
17551756
return false;
17561757
}
1757-
return true;
1758-
1758+
return context.searcher().getDirectoryReader().getReaderCacheHelper() instanceof DelegatingCacheHelper;
17591759
}
17601760

17611761
/**

server/src/test/java/org/opensearch/indices/IndicesServiceTests.java

+35
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,23 @@
3131

3232
package org.opensearch.indices;
3333

34+
import org.apache.lucene.index.DirectoryReader;
35+
import org.apache.lucene.index.IndexReader;
3436
import org.apache.lucene.search.similarities.BM25Similarity;
3537
import org.apache.lucene.search.similarities.Similarity;
3638
import org.apache.lucene.store.AlreadyClosedException;
3739
import org.opensearch.Version;
3840
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
3941
import org.opensearch.action.admin.indices.stats.IndexShardStats;
42+
import org.opensearch.action.search.SearchType;
4043
import org.opensearch.cluster.ClusterName;
4144
import org.opensearch.cluster.ClusterState;
4245
import org.opensearch.cluster.metadata.IndexGraveyard;
4346
import org.opensearch.cluster.metadata.IndexMetadata;
4447
import org.opensearch.cluster.metadata.Metadata;
4548
import org.opensearch.cluster.service.ClusterService;
4649
import org.opensearch.common.UUIDs;
50+
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader.DelegatingCacheHelper;
4751
import org.opensearch.common.settings.Setting;
4852
import org.opensearch.common.settings.Settings;
4953
import org.opensearch.common.unit.TimeValue;
@@ -76,8 +80,11 @@
7680
import org.opensearch.plugins.EnginePlugin;
7781
import org.opensearch.plugins.MapperPlugin;
7882
import org.opensearch.plugins.Plugin;
83+
import org.opensearch.search.internal.ContextIndexSearcher;
84+
import org.opensearch.search.internal.ShardSearchRequest;
7985
import org.opensearch.test.IndexSettingsModule;
8086
import org.opensearch.test.OpenSearchSingleNodeTestCase;
87+
import org.opensearch.test.TestSearchContext;
8188
import org.opensearch.test.hamcrest.RegexMatcher;
8289

8390
import java.io.IOException;
@@ -627,4 +634,32 @@ public void testClusterRemoteTranslogBufferIntervalDefault() {
627634
indicesService.getRemoteStoreSettings().getClusterRemoteTranslogBufferInterval()
628635
);
629636
}
637+
638+
public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws IOException {
639+
IndicesService indicesService = getIndicesService();
640+
final IndexService indexService = createIndex("test");
641+
ShardSearchRequest request = mock(ShardSearchRequest.class);
642+
when(request.requestCache()).thenReturn(true);
643+
644+
TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) {
645+
@Override
646+
public SearchType searchType() {
647+
return SearchType.QUERY_THEN_FETCH;
648+
}
649+
};
650+
651+
ContextIndexSearcher searcher = mock(ContextIndexSearcher.class);
652+
context.setSearcher(searcher);
653+
DirectoryReader reader = mock(DirectoryReader.class);
654+
when(searcher.getDirectoryReader()).thenReturn(reader);
655+
when(searcher.getIndexReader()).thenReturn(reader);
656+
IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class);
657+
DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class);
658+
659+
for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) {
660+
IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper;
661+
when(reader.getReaderCacheHelper()).thenReturn(cacheHelper);
662+
assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context));
663+
}
664+
}
630665
}

0 commit comments

Comments
 (0)