Skip to content

Commit a6e49ef

Browse files
peteralfonsiPeter Alfonsi
and
Peter Alfonsi
authored
[Backport 2.x] [Tiered Caching] Serializers for ehcache (#12709) (#12736)
* [Tiered Caching] Serializers for ehcache (#12709) Adds serializers and integrates them into ehcache disk cache --------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com> (cherry picked from commit 21b28f2) * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Removed incorrect changelog line Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Rerunning gradle checks Signed-off-by: Peter Alfonsi <petealft@amazon.com> --------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com>
1 parent 7e571dc commit a6e49ef

File tree

12 files changed

+607
-24
lines changed

12 files changed

+607
-24
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
2525
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
2626
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
27+
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
2728
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
2829
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
2930

modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.common.cache.RemovalListener;
1515
import org.opensearch.common.cache.RemovalNotification;
1616
import org.opensearch.common.cache.RemovalReason;
17+
import org.opensearch.common.cache.serializer.Serializer;
1718
import org.opensearch.common.cache.store.builders.ICacheBuilder;
1819
import org.opensearch.common.cache.store.config.CacheConfig;
1920

@@ -106,8 +107,11 @@ public MockDiskCacheFactory(long delay, int maxSize) {
106107
}
107108

108109
@Override
110+
@SuppressWarnings({ "unchecked" })
109111
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
110-
return new Builder<K, V>().setMaxSize(maxSize)
112+
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
113+
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
114+
.setMaxSize(maxSize)
111115
.setDeliberateDelay(delay)
112116
.setRemovalListener(config.getRemovalListener())
113117
.build();
@@ -123,6 +127,8 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {
123127

124128
int maxSize;
125129
long delay;
130+
Serializer<K, byte[]> keySerializer;
131+
Serializer<V, byte[]> valueSerializer;
126132

127133
@Override
128134
public ICache<K, V> build() {
@@ -138,5 +144,16 @@ public Builder<K, V> setDeliberateDelay(long millis) {
138144
this.delay = millis;
139145
return this;
140146
}
147+
148+
public Builder<K, V> setKeySerializer(Serializer<K, byte[]> keySerializer) {
149+
this.keySerializer = keySerializer;
150+
return this;
151+
}
152+
153+
public Builder<K, V> setValueSerializer(Serializer<V, byte[]> valueSerializer) {
154+
this.valueSerializer = valueSerializer;
155+
return this;
156+
}
157+
141158
}
142159
}

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

+194-23
Large diffs are not rendered by default.

plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java

+94
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,33 @@
99
package org.opensearch.cache.store.disk;
1010

1111
import org.opensearch.cache.EhcacheDiskCacheSettings;
12+
import org.opensearch.common.Randomness;
1213
import org.opensearch.common.cache.CacheType;
1314
import org.opensearch.common.cache.ICache;
1415
import org.opensearch.common.cache.LoadAwareCacheLoader;
1516
import org.opensearch.common.cache.RemovalListener;
1617
import org.opensearch.common.cache.RemovalNotification;
18+
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
19+
import org.opensearch.common.cache.serializer.Serializer;
1720
import org.opensearch.common.cache.store.config.CacheConfig;
1821
import org.opensearch.common.metrics.CounterMetric;
1922
import org.opensearch.common.settings.Settings;
2023
import org.opensearch.common.unit.TimeValue;
24+
import org.opensearch.core.common.bytes.BytesArray;
25+
import org.opensearch.core.common.bytes.BytesReference;
26+
import org.opensearch.core.common.bytes.CompositeBytesReference;
2127
import org.opensearch.env.NodeEnvironment;
2228
import org.opensearch.test.OpenSearchSingleNodeTestCase;
2329

2430
import java.io.IOException;
31+
import java.nio.charset.Charset;
32+
import java.nio.charset.StandardCharsets;
2533
import java.util.ArrayList;
2634
import java.util.HashMap;
2735
import java.util.Iterator;
2836
import java.util.List;
2937
import java.util.Map;
38+
import java.util.Random;
3039
import java.util.UUID;
3140
import java.util.concurrent.CopyOnWriteArrayList;
3241
import java.util.concurrent.CountDownLatch;
@@ -51,6 +60,8 @@ public void testBasicGetAndPut() throws IOException {
5160
.setIsEventListenerModeSync(true)
5261
.setKeyType(String.class)
5362
.setValueType(String.class)
63+
.setKeySerializer(new StringSerializer())
64+
.setValueSerializer(new StringSerializer())
5465
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
5566
.setSettings(settings)
5667
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -89,6 +100,8 @@ public void testBasicGetAndPutUsingFactory() throws IOException {
89100
new CacheConfig.Builder<String, String>().setValueType(String.class)
90101
.setKeyType(String.class)
91102
.setRemovalListener(removalListener)
103+
.setKeySerializer(new StringSerializer())
104+
.setValueSerializer(new StringSerializer())
92105
.setSettings(
93106
Settings.builder()
94107
.put(
@@ -149,6 +162,8 @@ public void testConcurrentPut() throws Exception {
149162
.setIsEventListenerModeSync(true) // For accurate count
150163
.setKeyType(String.class)
151164
.setValueType(String.class)
165+
.setKeySerializer(new StringSerializer())
166+
.setValueSerializer(new StringSerializer())
152167
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
153168
.setSettings(settings)
154169
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -194,6 +209,8 @@ public void testEhcacheParallelGets() throws Exception {
194209
.setIsEventListenerModeSync(true) // For accurate count
195210
.setKeyType(String.class)
196211
.setValueType(String.class)
212+
.setKeySerializer(new StringSerializer())
213+
.setValueSerializer(new StringSerializer())
197214
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
198215
.setSettings(settings)
199216
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -237,6 +254,8 @@ public void testEhcacheKeyIterator() throws Exception {
237254
.setIsEventListenerModeSync(true)
238255
.setKeyType(String.class)
239256
.setValueType(String.class)
257+
.setKeySerializer(new StringSerializer())
258+
.setValueSerializer(new StringSerializer())
240259
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
241260
.setSettings(settings)
242261
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -274,6 +293,8 @@ public void testEvictions() throws Exception {
274293
.setThreadPoolAlias("ehcacheTest")
275294
.setKeyType(String.class)
276295
.setValueType(String.class)
296+
.setKeySerializer(new StringSerializer())
297+
.setValueSerializer(new StringSerializer())
277298
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
278299
.setSettings(settings)
279300
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -304,6 +325,8 @@ public void testComputeIfAbsentConcurrently() throws Exception {
304325
.setThreadPoolAlias("ehcacheTest")
305326
.setKeyType(String.class)
306327
.setValueType(String.class)
328+
.setKeySerializer(new StringSerializer())
329+
.setValueSerializer(new StringSerializer())
307330
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
308331
.setSettings(settings)
309332
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -373,6 +396,8 @@ public void testComputeIfAbsentConcurrentlyAndThrowsException() throws Exception
373396
.setThreadPoolAlias("ehcacheTest")
374397
.setKeyType(String.class)
375398
.setValueType(String.class)
399+
.setKeySerializer(new StringSerializer())
400+
.setValueSerializer(new StringSerializer())
376401
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
377402
.setSettings(settings)
378403
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -430,6 +455,8 @@ public void testComputeIfAbsentWithNullValueLoading() throws Exception {
430455
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
431456
.setKeyType(String.class)
432457
.setValueType(String.class)
458+
.setKeySerializer(new StringSerializer())
459+
.setValueSerializer(new StringSerializer())
433460
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
434461
.setSettings(settings)
435462
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -491,6 +518,8 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {
491518
.setIsEventListenerModeSync(true)
492519
.setKeyType(String.class)
493520
.setValueType(String.class)
521+
.setKeySerializer(new StringSerializer())
522+
.setValueSerializer(new StringSerializer())
494523
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
495524
.setSettings(settings)
496525
.setExpireAfterAccess(TimeValue.MAX_VALUE)
@@ -525,6 +554,50 @@ public void testEhcacheKeyIteratorWithRemove() throws IOException {
525554

526555
}
527556

557+
public void testBasicGetAndPutBytesReference() throws Exception {
558+
Settings settings = Settings.builder().build();
559+
try (NodeEnvironment env = newNodeEnvironment(settings)) {
560+
ICache<String, BytesReference> ehCacheDiskCachingTier = new EhcacheDiskCache.Builder<String, BytesReference>()
561+
.setThreadPoolAlias("ehcacheTest")
562+
.setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache")
563+
.setKeySerializer(new StringSerializer())
564+
.setValueSerializer(new BytesReferenceSerializer())
565+
.setKeyType(String.class)
566+
.setValueType(BytesReference.class)
567+
.setCacheType(CacheType.INDICES_REQUEST_CACHE)
568+
.setSettings(settings)
569+
.setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES * 20) // bigger so no evictions happen
570+
.setExpireAfterAccess(TimeValue.MAX_VALUE)
571+
.setRemovalListener(new MockRemovalListener<>())
572+
.build();
573+
int randomKeys = randomIntBetween(10, 100);
574+
int valueLength = 100;
575+
Random rand = Randomness.get();
576+
Map<String, BytesReference> keyValueMap = new HashMap<>();
577+
for (int i = 0; i < randomKeys; i++) {
578+
byte[] valueBytes = new byte[valueLength];
579+
rand.nextBytes(valueBytes);
580+
keyValueMap.put(UUID.randomUUID().toString(), new BytesArray(valueBytes));
581+
582+
// Test a non-BytesArray implementation of BytesReference.
583+
byte[] compositeBytes1 = new byte[valueLength];
584+
byte[] compositeBytes2 = new byte[valueLength];
585+
rand.nextBytes(compositeBytes1);
586+
rand.nextBytes(compositeBytes2);
587+
BytesReference composite = CompositeBytesReference.of(new BytesArray(compositeBytes1), new BytesArray(compositeBytes2));
588+
keyValueMap.put(UUID.randomUUID().toString(), composite);
589+
}
590+
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
591+
ehCacheDiskCachingTier.put(entry.getKey(), entry.getValue());
592+
}
593+
for (Map.Entry<String, BytesReference> entry : keyValueMap.entrySet()) {
594+
BytesReference value = ehCacheDiskCachingTier.get(entry.getKey());
595+
assertEquals(entry.getValue(), value);
596+
}
597+
ehCacheDiskCachingTier.close();
598+
}
599+
}
600+
528601
private static String generateRandomString(int length) {
529602
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
530603
StringBuilder randomString = new StringBuilder(length);
@@ -546,4 +619,25 @@ public void onRemoval(RemovalNotification<K, V> notification) {
546619
evictionMetric.inc();
547620
}
548621
}
622+
623+
static class StringSerializer implements Serializer<String, byte[]> {
624+
private final Charset charset = StandardCharsets.UTF_8;
625+
626+
@Override
627+
public byte[] serialize(String object) {
628+
return object.getBytes(charset);
629+
}
630+
631+
@Override
632+
public String deserialize(byte[] bytes) {
633+
if (bytes == null) {
634+
return null;
635+
}
636+
return new String(bytes, charset);
637+
}
638+
639+
public boolean equals(String object, byte[] bytes) {
640+
return object.equals(deserialize(bytes));
641+
}
642+
}
549643
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.serializer;
10+
11+
import org.opensearch.core.common.bytes.BytesArray;
12+
import org.opensearch.core.common.bytes.BytesReference;
13+
14+
import java.util.Arrays;
15+
16+
/**
17+
* A serializer which transforms BytesReference to byte[].
18+
* The type of BytesReference is NOT preserved after deserialization, but nothing in opensearch should care.
19+
*/
20+
public class BytesReferenceSerializer implements Serializer<BytesReference, byte[]> {
21+
// This class does not get passed to ehcache itself, so it's not required that classes match after deserialization.
22+
23+
public BytesReferenceSerializer() {}
24+
25+
@Override
26+
public byte[] serialize(BytesReference object) {
27+
return BytesReference.toBytes(object);
28+
}
29+
30+
@Override
31+
public BytesReference deserialize(byte[] bytes) {
32+
if (bytes == null) {
33+
return null;
34+
}
35+
return new BytesArray(bytes);
36+
}
37+
38+
@Override
39+
public boolean equals(BytesReference object, byte[] bytes) {
40+
return Arrays.equals(serialize(object), bytes);
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.serializer;
10+
11+
/**
12+
* Defines an interface for serializers, to be used by pluggable caches.
13+
* T is the class of the original object, and U is the serialized class.
14+
*/
15+
public interface Serializer<T, U> {
16+
/**
17+
* Serializes an object.
18+
* @param object A non-serialized object.
19+
* @return The serialized representation of the object.
20+
*/
21+
U serialize(T object);
22+
23+
/**
24+
* Deserializes bytes into an object.
25+
* @param bytes The serialized representation.
26+
* @return The original object.
27+
*/
28+
T deserialize(U bytes);
29+
30+
/**
31+
* Compares an object to a serialized representation of an object.
32+
* @param object A non-serialized objet
33+
* @param bytes Serialized representation of an object
34+
* @return true if representing the same object, false if not
35+
*/
36+
boolean equals(T object, U bytes);
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
/** A package for serializers used in caches. */
9+
package org.opensearch.common.cache.serializer;

0 commit comments

Comments
 (0)