Skip to content

Commit ad77481

Browse files
peteralfonsiPeter Alfonsi
and
Peter Alfonsi
committed
[Bugfix] [Tiered Caching] Fixes issues when integrating tiered cache with disk cache (opensearch-project#13784)
--------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com> (cherry picked from commit e67ced7) Signed-off-by: Peter Alfonsi <petealft@amazon.com>
1 parent ae782e1 commit ad77481

File tree

7 files changed

+118
-76
lines changed

7 files changed

+118
-76
lines changed

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
119119
.setValueType(builder.cacheConfig.getValueType())
120120
.setSettings(builder.cacheConfig.getSettings())
121121
.setWeigher(builder.cacheConfig.getWeigher())
122+
.setKeySerializer(builder.cacheConfig.getKeySerializer())
123+
.setValueSerializer(builder.cacheConfig.getValueSerializer())
122124
.setDimensionNames(builder.cacheConfig.getDimensionNames())
123125
.setStatsTrackingEnabled(false)
124126
.build(),

modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java

+4
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnable
141141
@Override
142142
@SuppressWarnings({ "unchecked" })
143143
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
144+
// As we can't directly IT with the tiered cache and ehcache, check that we receive non-null serializers, as an ehcache disk
145+
// cache would require.
146+
assert config.getKeySerializer() != null;
147+
assert config.getValueSerializer() != null;
144148
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
145149
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
146150
.setMaxSize(maxSize)

modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java

+35
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.common.cache.RemovalListener;
1717
import org.opensearch.common.cache.RemovalNotification;
1818
import org.opensearch.common.cache.policy.CachedQueryResult;
19+
import org.opensearch.common.cache.serializer.Serializer;
1920
import org.opensearch.common.cache.settings.CacheSettings;
2021
import org.opensearch.common.cache.stats.ImmutableCacheStats;
2122
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
@@ -32,6 +33,8 @@
3233
import org.junit.Before;
3334

3435
import java.io.IOException;
36+
import java.nio.charset.Charset;
37+
import java.nio.charset.StandardCharsets;
3538
import java.util.ArrayList;
3639
import java.util.HashMap;
3740
import java.util.HashSet;
@@ -166,6 +169,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
166169
.setKeyType(String.class)
167170
.setWeigher((k, v) -> keyValueSize)
168171
.setRemovalListener(removalListener)
172+
.setKeySerializer(new StringSerializer())
173+
.setValueSerializer(new StringSerializer())
169174
.setSettings(settings)
170175
.setDimensionNames(dimensionNames)
171176
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
@@ -318,6 +323,8 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
318323
.setKeyType(String.class)
319324
.setWeigher((k, v) -> keyValueSize)
320325
.setRemovalListener(removalListener)
326+
.setKeySerializer(new StringSerializer())
327+
.setValueSerializer(new StringSerializer())
321328
.setDimensionNames(dimensionNames)
322329
.setSettings(
323330
Settings.builder()
@@ -830,6 +837,8 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio
830837
.setKeyType(String.class)
831838
.setWeigher((k, v) -> 150)
832839
.setRemovalListener(removalListener)
840+
.setKeySerializer(new StringSerializer())
841+
.setValueSerializer(new StringSerializer())
833842
.setSettings(
834843
Settings.builder()
835844
.put(
@@ -1014,6 +1023,8 @@ public void testTookTimePolicyFromFactory() throws Exception {
10141023
.setKeyType(String.class)
10151024
.setWeigher((k, v) -> keyValueSize)
10161025
.setRemovalListener(removalListener)
1026+
.setKeySerializer(new StringSerializer())
1027+
.setValueSerializer(new StringSerializer())
10171028
.setSettings(settings)
10181029
.setMaxSizeInBytes(onHeapCacheSize * keyValueSize)
10191030
.setDimensionNames(dimensionNames)
@@ -1415,6 +1426,8 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
14151426
.setSettings(settings)
14161427
.setDimensionNames(dimensionNames)
14171428
.setRemovalListener(removalListener)
1429+
.setKeySerializer(new StringSerializer())
1430+
.setValueSerializer(new StringSerializer())
14181431
.setSettings(
14191432
Settings.builder()
14201433
.put(
@@ -1479,4 +1492,26 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache<?, ?> t
14791492
}
14801493
return snapshot;
14811494
}
1495+
1496+
// Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin
1497+
static class StringSerializer implements Serializer<String, byte[]> {
1498+
private final Charset charset = StandardCharsets.UTF_8;
1499+
1500+
@Override
1501+
public byte[] serialize(String object) {
1502+
return object.getBytes(charset);
1503+
}
1504+
1505+
@Override
1506+
public String deserialize(byte[] bytes) {
1507+
if (bytes == null) {
1508+
return null;
1509+
}
1510+
return new String(bytes, charset);
1511+
}
1512+
1513+
public boolean equals(String object, byte[] bytes) {
1514+
return object.equals(deserialize(bytes));
1515+
}
1516+
}
14821517
}

plugins/cache-ehcache/build.gradle

-10
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,3 @@ tasks.named("bundlePlugin").configure {
9090
into 'config'
9191
}
9292
}
93-
94-
test {
95-
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
96-
systemProperty 'tests.security.manager', 'false'
97-
}
98-
99-
internalClusterTest {
100-
// TODO: Remove this later once we have a way.
101-
systemProperty 'tests.security.manager', 'false'
102-
}

plugins/cache-ehcache/licenses/slf4j-api-LICENSE.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
1818
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
1919
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
2020
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21-
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21+
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

+73-65
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.nio.file.Files;
4343
import java.nio.file.Path;
4444
import java.nio.file.Paths;
45+
import java.security.AccessController;
46+
import java.security.PrivilegedAction;
4547
import java.time.Duration;
4648
import java.util.Arrays;
4749
import java.util.Iterator;
@@ -175,57 +177,60 @@ private EhcacheDiskCache(Builder<K, V> builder) {
175177

176178
@SuppressWarnings({ "rawtypes" })
177179
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
178-
try {
179-
return this.cacheManager.createCache(
180-
this.diskCacheAlias,
181-
CacheConfigurationBuilder.newCacheConfigurationBuilder(
182-
ICacheKey.class,
183-
ByteArrayWrapper.class,
184-
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
185-
).withExpiry(new ExpiryPolicy<>() {
186-
@Override
187-
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
188-
return INFINITE;
189-
}
190-
191-
@Override
192-
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
193-
return expireAfterAccess;
194-
}
195-
196-
@Override
197-
public Duration getExpiryForUpdate(
198-
ICacheKey key,
199-
Supplier<? extends ByteArrayWrapper> oldValue,
200-
ByteArrayWrapper newValue
201-
) {
202-
return INFINITE;
203-
}
204-
})
205-
.withService(getListenerConfiguration(builder))
206-
.withService(
207-
new OffHeapDiskStoreConfiguration(
208-
this.threadPoolAlias,
209-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
210-
.get(DISK_WRITE_CONCURRENCY_KEY)
211-
.get(settings),
212-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
180+
// Creating the cache requires permissions specified in plugin-security.policy
181+
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
182+
try {
183+
return this.cacheManager.createCache(
184+
this.diskCacheAlias,
185+
CacheConfigurationBuilder.newCacheConfigurationBuilder(
186+
ICacheKey.class,
187+
ByteArrayWrapper.class,
188+
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
189+
).withExpiry(new ExpiryPolicy<>() {
190+
@Override
191+
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
192+
return INFINITE;
193+
}
194+
195+
@Override
196+
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
197+
return expireAfterAccess;
198+
}
199+
200+
@Override
201+
public Duration getExpiryForUpdate(
202+
ICacheKey key,
203+
Supplier<? extends ByteArrayWrapper> oldValue,
204+
ByteArrayWrapper newValue
205+
) {
206+
return INFINITE;
207+
}
208+
})
209+
.withService(getListenerConfiguration(builder))
210+
.withService(
211+
new OffHeapDiskStoreConfiguration(
212+
this.threadPoolAlias,
213+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
214+
.get(DISK_WRITE_CONCURRENCY_KEY)
215+
.get(settings),
216+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
217+
)
213218
)
214-
)
215-
.withKeySerializer(new KeySerializerWrapper(keySerializer))
216-
.withValueSerializer(new ByteArrayWrapperSerializer())
219+
.withKeySerializer(new KeySerializerWrapper(keySerializer))
220+
.withValueSerializer(new ByteArrayWrapperSerializer())
217221
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
218222
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
219223
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
220224
// before V hits ehcache.
221-
);
222-
} catch (IllegalArgumentException ex) {
223-
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
224-
throw ex;
225-
} catch (IllegalStateException ex) {
226-
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
227-
throw ex;
228-
}
225+
);
226+
} catch (IllegalArgumentException ex) {
227+
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
228+
throw ex;
229+
} catch (IllegalStateException ex) {
230+
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
231+
throw ex;
232+
}
233+
});
229234
}
230235

231236
private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
@@ -252,25 +257,28 @@ Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> getCompletableFutur
252257
@SuppressForbidden(reason = "Ehcache uses File.io")
253258
private PersistentCacheManager buildCacheManager() {
254259
// In case we use multiple ehCaches, we can define this cache manager at a global level.
255-
return CacheManagerBuilder.newCacheManagerBuilder()
256-
.with(CacheManagerBuilder.persistence(new File(storagePath)))
257-
258-
.using(
259-
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
260-
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
261-
// like event listeners
262-
.pool(
263-
this.threadPoolAlias,
264-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
265-
.get(DISK_WRITE_MIN_THREADS_KEY)
266-
.get(settings),
267-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
268-
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
269-
.get(settings)
270-
)
271-
.build()
272-
)
273-
.build(true);
260+
// Creating the cache manager also requires permissions specified in plugin-security.policy
261+
return AccessController.doPrivileged((PrivilegedAction<PersistentCacheManager>) () -> {
262+
return CacheManagerBuilder.newCacheManagerBuilder()
263+
.with(CacheManagerBuilder.persistence(new File(storagePath)))
264+
265+
.using(
266+
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
267+
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
268+
// like event listeners
269+
.pool(
270+
this.threadPoolAlias,
271+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
272+
.get(DISK_WRITE_MIN_THREADS_KEY)
273+
.get(settings),
274+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
275+
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
276+
.get(settings)
277+
)
278+
.build()
279+
)
280+
.build(true);
281+
});
274282
}
275283

276284
@Override

plugins/cache-ehcache/src/main/plugin-metadata/plugin-security.policy

+3
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,8 @@
99
grant {
1010
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
1111
permission java.lang.RuntimePermission "createClassLoader";
12+
permission java.lang.RuntimePermission "accessDeclaredMembers";
13+
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
14+
permission java.lang.RuntimePermission "getClassLoader";
1215
};
1316

0 commit comments

Comments
 (0)