Skip to content

Commit f372c2c

Browse files
sgup432kiranprakash154
authored and
Peter Alfonsi
committed
[Tiered Caching] Enable serialization of IndicesRequestCache.Key (opensearch-project#10275)
--------- Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com> Co-authored-by: Kiran Prakash <awskiran@amazon.com>
1 parent b880c0b commit f372c2c

File tree

7 files changed

+325
-143
lines changed

7 files changed

+325
-143
lines changed

libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java

+7
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
package org.opensearch.core.index.shard;
3434

35+
import org.apache.lucene.util.RamUsageEstimator;
3536
import org.opensearch.common.annotation.PublicApi;
3637
import org.opensearch.core.common.Strings;
3738
import org.opensearch.core.common.io.stream.StreamInput;
@@ -55,6 +56,8 @@ public class ShardId implements Comparable<ShardId>, ToXContentFragment, Writeab
5556
private final int shardId;
5657
private final int hashCode;
5758

59+
private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class);
60+
5861
/**
5962
* Constructs a new shard id.
6063
* @param index the index name
@@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException {
8891
hashCode = computeHashCode();
8992
}
9093

94+
public long getBaseRamBytesUsed() {
95+
return BASE_RAM_BYTES_USED;
96+
}
97+
9198
/**
9299
* Writes this shard id to a stream.
93100
* @param out the stream to write to

server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java

+40
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,45 @@ public void testProfileDisableCache() throws Exception {
634634
}
635635
}
636636

637+
public void testCacheWithInvalidation() throws Exception {
638+
Client client = client();
639+
assertAcked(
640+
client.admin()
641+
.indices()
642+
.prepareCreate("index")
643+
.setMapping("k", "type=keyword")
644+
.setSettings(
645+
Settings.builder()
646+
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
647+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
648+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
649+
)
650+
.get()
651+
);
652+
indexRandom(true, client.prepareIndex("index").setSource("k", "hello"));
653+
ensureSearchable("index");
654+
SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
655+
assertSearchResponse(resp);
656+
OpenSearchAssertions.assertAllSuccessful(resp);
657+
assertThat(resp.getHits().getTotalHits().value, equalTo(1L));
658+
659+
assertCacheState(client, "index", 0, 1);
660+
// Index but don't refresh
661+
indexRandom(false, client.prepareIndex("index").setSource("k", "hello2"));
662+
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
663+
assertSearchResponse(resp);
664+
// Should expect hit as here as refresh didn't happen
665+
assertCacheState(client, "index", 1, 1);
666+
667+
// Explicit refresh would invalidate cache
668+
refresh();
669+
// Hit same query again
670+
resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get();
671+
assertSearchResponse(resp);
672+
// Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh)
673+
assertCacheState(client, "index", 1, 2);
674+
}
675+
637676
private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
638677
RequestCacheStats requestCacheStats = client.admin()
639678
.indices()
@@ -648,6 +687,7 @@ private static void assertCacheState(Client client, String index, long expectedH
648687
Arrays.asList(expectedHits, expectedMisses, 0L),
649688
Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions())
650689
);
690+
651691
}
652692

653693
}

server/src/main/java/org/opensearch/common/lucene/index/OpenSearchDirectoryReader.java

+60-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.opensearch.core.index.shard.ShardId;
4040

4141
import java.io.IOException;
42+
import java.util.Optional;
43+
import java.util.UUID;
4244

4345
/**
4446
* A {@link org.apache.lucene.index.FilterDirectoryReader} that exposes
@@ -51,11 +53,14 @@ public final class OpenSearchDirectoryReader extends FilterDirectoryReader {
5153
private final ShardId shardId;
5254
private final FilterDirectoryReader.SubReaderWrapper wrapper;
5355

56+
private final DelegatingCacheHelper delegatingCacheHelper;
57+
5458
private OpenSearchDirectoryReader(DirectoryReader in, FilterDirectoryReader.SubReaderWrapper wrapper, ShardId shardId)
5559
throws IOException {
5660
super(in, wrapper);
5761
this.wrapper = wrapper;
5862
this.shardId = shardId;
63+
this.delegatingCacheHelper = new DelegatingCacheHelper(in.getReaderCacheHelper());
5964
}
6065

6166
/**
@@ -68,7 +73,61 @@ public ShardId shardId() {
6873
@Override
6974
public CacheHelper getReaderCacheHelper() {
7075
// safe to delegate since this reader does not alter the index
71-
return in.getReaderCacheHelper();
76+
return this.delegatingCacheHelper;
77+
}
78+
79+
public DelegatingCacheHelper getDelegatingCacheHelper() {
80+
return this.delegatingCacheHelper;
81+
}
82+
83+
/**
84+
* Wraps existing IndexReader cache helper which internally provides a way to wrap CacheKey.
85+
* @opensearch.internal
86+
*/
87+
public class DelegatingCacheHelper implements CacheHelper {
88+
private final CacheHelper cacheHelper;
89+
private final DelegatingCacheKey serializableCacheKey;
90+
91+
DelegatingCacheHelper(CacheHelper cacheHelper) {
92+
this.cacheHelper = cacheHelper;
93+
this.serializableCacheKey = new DelegatingCacheKey(Optional.ofNullable(cacheHelper).map(key -> getKey()).orElse(null));
94+
}
95+
96+
@Override
97+
public CacheKey getKey() {
98+
return this.cacheHelper.getKey();
99+
}
100+
101+
public DelegatingCacheKey getDelegatingCacheKey() {
102+
return this.serializableCacheKey;
103+
}
104+
105+
@Override
106+
public void addClosedListener(ClosedListener listener) {
107+
this.cacheHelper.addClosedListener(listener);
108+
}
109+
}
110+
111+
/**
112+
* Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of
113+
* object itself for serialization purposes.
114+
*/
115+
public class DelegatingCacheKey {
116+
private final CacheKey cacheKey;
117+
private final String uniqueId;
118+
119+
DelegatingCacheKey(CacheKey cacheKey) {
120+
this.cacheKey = cacheKey;
121+
this.uniqueId = UUID.randomUUID().toString();
122+
}
123+
124+
public CacheKey getCacheKey() {
125+
return this.cacheKey;
126+
}
127+
128+
public String getId() {
129+
return uniqueId;
130+
}
72131
}
73132

74133
@Override

0 commit comments

Comments
 (0)