Skip to content

Commit be97e90

Browse files
peteralfonsiPeter Alfonsi
and
Peter Alfonsi
committed
[Bugfix] [Tiered Caching] Fixes issues when integrating tiered cache with disk cache (opensearch-project#13784)
--------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com>
1 parent 60ee715 commit be97e90

File tree

9 files changed

+143
-79
lines changed

9 files changed

+143
-79
lines changed

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
119119
.setValueType(builder.cacheConfig.getValueType())
120120
.setSettings(builder.cacheConfig.getSettings())
121121
.setWeigher(builder.cacheConfig.getWeigher())
122+
.setKeySerializer(builder.cacheConfig.getKeySerializer())
123+
.setValueSerializer(builder.cacheConfig.getValueSerializer())
122124
.setDimensionNames(builder.cacheConfig.getDimensionNames())
123125
.setStatsTrackingEnabled(false)
124126
.build(),

modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java

+4
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnable
141141
@Override
142142
@SuppressWarnings({ "unchecked" })
143143
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
144+
// As we can't directly IT with the tiered cache and ehcache, check that we receive non-null serializers, as an ehcache disk
145+
// cache would require.
146+
assert config.getKeySerializer() != null;
147+
assert config.getValueSerializer() != null;
144148
return new Builder<K, V>().setKeySerializer((Serializer<K, byte[]>) config.getKeySerializer())
145149
.setValueSerializer((Serializer<V, byte[]>) config.getValueSerializer())
146150
.setMaxSize(maxSize)

modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java

+35
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.common.cache.RemovalListener;
1717
import org.opensearch.common.cache.RemovalNotification;
1818
import org.opensearch.common.cache.policy.CachedQueryResult;
19+
import org.opensearch.common.cache.serializer.Serializer;
1920
import org.opensearch.common.cache.settings.CacheSettings;
2021
import org.opensearch.common.cache.stats.ImmutableCacheStats;
2122
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
@@ -32,6 +33,8 @@
3233
import org.junit.Before;
3334

3435
import java.io.IOException;
36+
import java.nio.charset.Charset;
37+
import java.nio.charset.StandardCharsets;
3538
import java.util.ArrayList;
3639
import java.util.HashMap;
3740
import java.util.HashSet;
@@ -166,6 +169,8 @@ public void testComputeIfAbsentWithFactoryBasedCacheCreation() throws Exception
166169
.setKeyType(String.class)
167170
.setWeigher((k, v) -> keyValueSize)
168171
.setRemovalListener(removalListener)
172+
.setKeySerializer(new StringSerializer())
173+
.setValueSerializer(new StringSerializer())
169174
.setSettings(settings)
170175
.setDimensionNames(dimensionNames)
171176
.setCachedResultParser(s -> new CachedQueryResult.PolicyValues(20_000_000L)) // Values will always appear to have taken
@@ -318,6 +323,8 @@ public void testComputeIfAbsentWithEvictionsFromOnHeapCache() throws Exception {
318323
.setKeyType(String.class)
319324
.setWeigher((k, v) -> keyValueSize)
320325
.setRemovalListener(removalListener)
326+
.setKeySerializer(new StringSerializer())
327+
.setValueSerializer(new StringSerializer())
321328
.setDimensionNames(dimensionNames)
322329
.setSettings(
323330
Settings.builder()
@@ -830,6 +837,8 @@ public void testConcurrencyForEvictionFlowFromOnHeapToDiskTier() throws Exceptio
830837
.setKeyType(String.class)
831838
.setWeigher((k, v) -> 150)
832839
.setRemovalListener(removalListener)
840+
.setKeySerializer(new StringSerializer())
841+
.setValueSerializer(new StringSerializer())
833842
.setSettings(
834843
Settings.builder()
835844
.put(
@@ -1014,6 +1023,8 @@ public void testTookTimePolicyFromFactory() throws Exception {
10141023
.setKeyType(String.class)
10151024
.setWeigher((k, v) -> keyValueSize)
10161025
.setRemovalListener(removalListener)
1026+
.setKeySerializer(new StringSerializer())
1027+
.setValueSerializer(new StringSerializer())
10171028
.setSettings(settings)
10181029
.setMaxSizeInBytes(onHeapCacheSize * keyValueSize)
10191030
.setDimensionNames(dimensionNames)
@@ -1415,6 +1426,8 @@ private TieredSpilloverCache<String, String> intializeTieredSpilloverCache(
14151426
.setSettings(settings)
14161427
.setDimensionNames(dimensionNames)
14171428
.setRemovalListener(removalListener)
1429+
.setKeySerializer(new StringSerializer())
1430+
.setValueSerializer(new StringSerializer())
14181431
.setSettings(
14191432
Settings.builder()
14201433
.put(
@@ -1479,4 +1492,26 @@ private ImmutableCacheStats getStatsSnapshotForTier(TieredSpilloverCache<?, ?> t
14791492
}
14801493
return snapshot;
14811494
}
1495+
1496+
// Duplicated here from EhcacheDiskCacheTests.java, we can't add a dependency on that plugin
1497+
static class StringSerializer implements Serializer<String, byte[]> {
1498+
private final Charset charset = StandardCharsets.UTF_8;
1499+
1500+
@Override
1501+
public byte[] serialize(String object) {
1502+
return object.getBytes(charset);
1503+
}
1504+
1505+
@Override
1506+
public String deserialize(byte[] bytes) {
1507+
if (bytes == null) {
1508+
return null;
1509+
}
1510+
return new String(bytes, charset);
1511+
}
1512+
1513+
public boolean equals(String object, byte[] bytes) {
1514+
return object.equals(deserialize(bytes));
1515+
}
1516+
}
14821517
}

plugins/cache-ehcache/build.gradle

+4-14
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ versions << [
2424

2525
dependencies {
2626
api "org.ehcache:ehcache:${versions.ehcache}"
27+
api "org.slf4j:slf4j-api:${versions.slf4j}"
2728
}
2829

2930
thirdPartyAudit {
@@ -78,10 +79,9 @@ thirdPartyAudit {
7879
'org.osgi.framework.BundleActivator',
7980
'org.osgi.framework.BundleContext',
8081
'org.osgi.framework.ServiceReference',
81-
'org.slf4j.Logger',
82-
'org.slf4j.LoggerFactory',
83-
'org.slf4j.Marker',
84-
'org.slf4j.event.Level'
82+
'org.slf4j.impl.StaticLoggerBinder',
83+
'org.slf4j.impl.StaticMDCBinder',
84+
'org.slf4j.impl.StaticMarkerBinder'
8585
)
8686
}
8787

@@ -90,13 +90,3 @@ tasks.named("bundlePlugin").configure {
9090
into 'config'
9191
}
9292
}
93-
94-
test {
95-
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
96-
systemProperty 'tests.security.manager', 'false'
97-
}
98-
99-
internalClusterTest {
100-
// TODO: Remove this later once we have a way.
101-
systemProperty 'tests.security.manager', 'false'
102-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
6c62681a2f655b49963a5983b8b0950a6120ae14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
Copyright (c) 2004-2022 QOS.ch
2+
All rights reserved.
3+
4+
Permission is hereby granted, free of charge, to any person obtaining
5+
a copy of this software and associated documentation files (the
6+
"Software"), to deal in the Software without restriction, including
7+
without limitation the rights to use, copy, modify, merge, publish,
8+
distribute, sublicense, and/or sell copies of the Software, and to
9+
permit persons to whom the Software is furnished to do so, subject to
10+
the following conditions:
11+
12+
The above copyright notice and this permission notice shall be
13+
included in all copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16+
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17+
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
18+
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
19+
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
20+
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21+
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

plugins/cache-ehcache/licenses/slf4j-api-NOTICE.txt

Whitespace-only changes.

plugins/cache-ehcache/src/main/java/org/opensearch/cache/store/disk/EhcacheDiskCache.java

+73-65
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.nio.file.Files;
4343
import java.nio.file.Path;
4444
import java.nio.file.Paths;
45+
import java.security.AccessController;
46+
import java.security.PrivilegedAction;
4547
import java.time.Duration;
4648
import java.util.Arrays;
4749
import java.util.Iterator;
@@ -175,57 +177,60 @@ private EhcacheDiskCache(Builder<K, V> builder) {
175177

176178
@SuppressWarnings({ "rawtypes" })
177179
private Cache<ICacheKey, ByteArrayWrapper> buildCache(Duration expireAfterAccess, Builder<K, V> builder) {
178-
try {
179-
return this.cacheManager.createCache(
180-
this.diskCacheAlias,
181-
CacheConfigurationBuilder.newCacheConfigurationBuilder(
182-
ICacheKey.class,
183-
ByteArrayWrapper.class,
184-
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
185-
).withExpiry(new ExpiryPolicy<>() {
186-
@Override
187-
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
188-
return INFINITE;
189-
}
190-
191-
@Override
192-
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
193-
return expireAfterAccess;
194-
}
195-
196-
@Override
197-
public Duration getExpiryForUpdate(
198-
ICacheKey key,
199-
Supplier<? extends ByteArrayWrapper> oldValue,
200-
ByteArrayWrapper newValue
201-
) {
202-
return INFINITE;
203-
}
204-
})
205-
.withService(getListenerConfiguration(builder))
206-
.withService(
207-
new OffHeapDiskStoreConfiguration(
208-
this.threadPoolAlias,
209-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
210-
.get(DISK_WRITE_CONCURRENCY_KEY)
211-
.get(settings),
212-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
180+
// Creating the cache requires permissions specified in plugin-security.policy
181+
return AccessController.doPrivileged((PrivilegedAction<Cache<ICacheKey, ByteArrayWrapper>>) () -> {
182+
try {
183+
return this.cacheManager.createCache(
184+
this.diskCacheAlias,
185+
CacheConfigurationBuilder.newCacheConfigurationBuilder(
186+
ICacheKey.class,
187+
ByteArrayWrapper.class,
188+
ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B)
189+
).withExpiry(new ExpiryPolicy<>() {
190+
@Override
191+
public Duration getExpiryForCreation(ICacheKey key, ByteArrayWrapper value) {
192+
return INFINITE;
193+
}
194+
195+
@Override
196+
public Duration getExpiryForAccess(ICacheKey key, Supplier<? extends ByteArrayWrapper> value) {
197+
return expireAfterAccess;
198+
}
199+
200+
@Override
201+
public Duration getExpiryForUpdate(
202+
ICacheKey key,
203+
Supplier<? extends ByteArrayWrapper> oldValue,
204+
ByteArrayWrapper newValue
205+
) {
206+
return INFINITE;
207+
}
208+
})
209+
.withService(getListenerConfiguration(builder))
210+
.withService(
211+
new OffHeapDiskStoreConfiguration(
212+
this.threadPoolAlias,
213+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
214+
.get(DISK_WRITE_CONCURRENCY_KEY)
215+
.get(settings),
216+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType).get(DISK_SEGMENT_KEY).get(settings)
217+
)
213218
)
214-
)
215-
.withKeySerializer(new KeySerializerWrapper(keySerializer))
216-
.withValueSerializer(new ByteArrayWrapperSerializer())
219+
.withKeySerializer(new KeySerializerWrapper(keySerializer))
220+
.withValueSerializer(new ByteArrayWrapperSerializer())
217221
// We pass ByteArrayWrapperSerializer as ehcache's value serializer. If V is an interface, and we pass its
218222
// serializer directly to ehcache, ehcache requires the classes match exactly before/after serialization.
219223
// This is not always feasible or necessary, like for BytesReference. So, we handle the value serialization
220224
// before V hits ehcache.
221-
);
222-
} catch (IllegalArgumentException ex) {
223-
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
224-
throw ex;
225-
} catch (IllegalStateException ex) {
226-
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
227-
throw ex;
228-
}
225+
);
226+
} catch (IllegalArgumentException ex) {
227+
logger.error("Ehcache disk cache initialization failed due to illegal argument: {}", ex.getMessage());
228+
throw ex;
229+
} catch (IllegalStateException ex) {
230+
logger.error("Ehcache disk cache initialization failed: {}", ex.getMessage());
231+
throw ex;
232+
}
233+
});
229234
}
230235

231236
private CacheEventListenerConfigurationBuilder getListenerConfiguration(Builder<K, V> builder) {
@@ -252,25 +257,28 @@ Map<ICacheKey<K>, CompletableFuture<Tuple<ICacheKey<K>, V>>> getCompletableFutur
252257
@SuppressForbidden(reason = "Ehcache uses File.io")
253258
private PersistentCacheManager buildCacheManager() {
254259
// In case we use multiple ehCaches, we can define this cache manager at a global level.
255-
return CacheManagerBuilder.newCacheManagerBuilder()
256-
.with(CacheManagerBuilder.persistence(new File(storagePath)))
257-
258-
.using(
259-
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
260-
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
261-
// like event listeners
262-
.pool(
263-
this.threadPoolAlias,
264-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
265-
.get(DISK_WRITE_MIN_THREADS_KEY)
266-
.get(settings),
267-
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
268-
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
269-
.get(settings)
270-
)
271-
.build()
272-
)
273-
.build(true);
260+
// Creating the cache manager also requires permissions specified in plugin-security.policy
261+
return AccessController.doPrivileged((PrivilegedAction<PersistentCacheManager>) () -> {
262+
return CacheManagerBuilder.newCacheManagerBuilder()
263+
.with(CacheManagerBuilder.persistence(new File(storagePath)))
264+
265+
.using(
266+
PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder()
267+
.defaultPool(THREAD_POOL_ALIAS_PREFIX + "Default#" + UNIQUE_ID, 1, 3) // Default pool used for other tasks
268+
// like event listeners
269+
.pool(
270+
this.threadPoolAlias,
271+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
272+
.get(DISK_WRITE_MIN_THREADS_KEY)
273+
.get(settings),
274+
(Integer) EhcacheDiskCacheSettings.getSettingListForCacheType(cacheType)
275+
.get(DISK_WRITE_MAXIMUM_THREADS_KEY)
276+
.get(settings)
277+
)
278+
.build()
279+
)
280+
.build(true);
281+
});
274282
}
275283

276284
@Override

plugins/cache-ehcache/src/main/plugin-metadata/plugin-security.policy

+3
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,8 @@
99
grant {
1010
permission java.lang.RuntimePermission "accessClassInPackage.sun.misc";
1111
permission java.lang.RuntimePermission "createClassLoader";
12+
permission java.lang.RuntimePermission "accessDeclaredMembers";
13+
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
14+
permission java.lang.RuntimePermission "getClassLoader";
1215
};
1316

0 commit comments

Comments
 (0)