Skip to content

Commit 24d2470

Browse files
sgup432kiranprakash154
authored andcommitted
[Tiered Caching] Enable serialization of IndicesRequestCache.Key (opensearch-project#10275)
--------- Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> Co-authored-by: Kiran Prakash <awskiran@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 19cf08c commit 24d2470

File tree

8 files changed

+327
-145
lines changed

8 files changed

+327
-145
lines changed

CHANGELOG.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
101101
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
102102
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
103103
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
104-
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753]
105-
(https://github.com/opensearch-project/OpenSearch/pull/10753))
104+
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
105+
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
106106
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
107107
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
108108
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))

libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java

+7
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.core.index.shard;
3434

35+
import org.apache.lucene.util.RamUsageEstimator;
3536
import org.opensearch.common.annotation.PublicApi;
3637
import org.opensearch.core.common.Strings;
3738
import org.opensearch.core.common.io.stream.StreamInput;
@@ -55,6 +56,8 @@ public class ShardId implements Comparable<ShardId>, ToXContentFragment, Writeab
5556
private final int shardId;
5657
private final int hashCode;
5758

59+
private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class);
60+
5861
/**
5962
* Constructs a new shard id.
6063
* @param index the index name
@@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException {
8891
hashCode = computeHashCode();
8992
}
9093

94+
public long getBaseRamBytesUsed() {
95+
return BASE_RAM_BYTES_USED;
96+
}
97+
9198
/**
9299
* Writes this shard id to a stream.
93100
* @param out the stream to write to

server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java

+40
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,45 @@ public void testProfileDisableCache() throws Exception {
636636
}
637637
}
638638

639+
public void testCacheWithInvalidation() throws Exception {
640+
Client client = client();
641+
assertAcked(
642+
client.admin()
643+
.indices()
644+
.prepareCreate("index")
645+
.setMapping("k", "type=keyword")
646+
.setSettings(
647+
Settings.builder()
648+
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
649+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
650+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
651+
)
652+
.get()
653+
);
654+
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
655+
ensureSearchable("index");
656+
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
657+
assertSearchResponse(resp);
658+
OpenSearchAssertions.assertAllSuccessful(resp);
659+
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));
660+
661+
assertCacheState(client, "index", 0, 1);
662+
// Index but don't refresh
663+
indexRandom(false, client.prepareIndex("index").setSource("k", "hello2"));
664+
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
665+
assertSearchResponse(resp);
666+
// Should expect hit as here as refresh didn't happen
667+
assertCacheState(client, "index", 1, 1);
668+
669+
// Explicit refresh would invalidate cache
670+
refresh();
671+
// Hit same query again
672+
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
673+
assertSearchResponse(resp);
674+
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
675+
assertCacheState(client, "index", 1, 2);
676+
}
677+
639678
private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
640679
RequestCacheStats requestCacheStats = client.admin()
641680
.indices()
@@ -650,6 +689,7 @@ private static void assertCacheState(Client client, String index, long expectedH
650689
Arrays.asList(expectedHits, expectedMisses, 0L),
651690
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
652691
);
692+
653693
}
654694

655695
}

server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java

+60-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.opensearch.core.index.shard.ShardId;
4141

4242
import java.io.IOException;
43+
import java.util.Optional;
44+
import java.util.UUID;
4345

4446
/**
4547
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
@@ -53,11 +55,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader {
5355
private final ShardId shardId;
5456
private final FilterDirectoryReader.SubReaderWrapper wrapper;
5557

58+
private final DelegatingCacheHelper delegatingCacheHelper;
59+
5660
private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId)
5761
throws IOException {
5862
super(in, wrapper);
5963
this.wrapper = wrapper;
6064
this.shardId = shardId;
65+
this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper());
6166
}
6267

6368
/**
@@ -70,7 +75,61 @@ public ShardId shardId() {
7075
@Override
7176
public CacheHelper getReaderCacheHelper() {
7277
// safe to delegate since this reader does not alter the index
73-
return in.getReaderCacheHelper();
78+
return this.delegatingCacheHelper;
79+
}
80+
81+
public DelegatingCacheHelper getDelegatingCacheHelper() {
82+
return this.delegatingCacheHelper;
83+
}
84+
85+
/**
86+
* Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey.
87+
* @opensearch.internal
88+
*/
89+
public class DelegatingCacheHelper implements CacheHelper {
90+
private final CacheHelper cacheHelper;
91+
private final DelegatingCacheKey serializableCacheKey;
92+
93+
DelegatingCacheHelper(CacheHelper cacheHelper) {
94+
this.cacheHelper = cacheHelper;
95+
this.serializableCacheKey = new DelegatingCacheKey(Optional.ofNullable(cacheHelper).map(key -> getKey()).orElse(null));
96+
}
97+
98+
@Override
99+
public CacheKey getKey() {
100+
return this.cacheHelper.getKey();
101+
}
102+
103+
public DelegatingCacheKey getDelegatingCacheKey() {
104+
return this.serializableCacheKey;
105+
}
106+
107+
@Override
108+
public void addClosedListener(ClosedListener listener) {
109+
this.cacheHelper.addClosedListener(listener);
110+
}
111+
}
112+
113+
/**
114+
* Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of
115+
* object itself for serialization purposes.
116+
*/
117+
public class DelegatingCacheKey {
118+
private final CacheKey cacheKey;
119+
private final String uniqueId;
120+
121+
DelegatingCacheKey(CacheKey cacheKey) {
122+
this.cacheKey = cacheKey;
123+
this.uniqueId = UUID.randomUUID().toString();
124+
}
125+
126+
public CacheKey getCacheKey() {
127+
return this.cacheKey;
128+
}
129+
130+
public String getId() {
131+
return uniqueId;
132+
}
74133
}
75134

76135
@Override

0 commit comments

Comments
 (0)