Skip to content

Commit ccdf3ff

Browse files
peteralfonsiPeter Alfonsi
and
Peter Alfonsi
authored
[Tiered Caching] Cache tier policies (#12542)
* Adds policy interface and took time policy impl Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Changes IndicesService to write a CachePolicyInfoWrapper before the QSR Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Moved took time logic from QSR to IndicesService Signed-off-by: Peter Alfonsi <petealft@amazon.com> * spotlessApply Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Addressed ansjcy's comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Partial rebase on most recent changes Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Integrated policies with new TSC changes Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Reverted unintended change to idea/vcs.xml Signed-off-by: Peter Alfonsi <petealft@amazon.com> * javadocs Signed-off-by: Peter Alfonsi <petealft@amazon.com> * github actions Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Set default threshold value to 10 ms Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Addressed Sorabh's comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Addressed Sorabh's second round of comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Set cachedQueryParser in IRC Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Addressed Sorabh's comments besides dynamic setting Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Removed dynamic setting, misc comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Added changelog entry Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Added missing javadoc Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Fixed failed gradle run Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Added setting validation test Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle for flaky IT Signed-off-by: Peter Alfonsi <petealft@amazon.com> * javadocs Signed-off-by: Peter Alfonsi <petealft@amazon.com> --------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com>
1 parent 0ad6b5e commit ccdf3ff

File tree

14 files changed

+645
-10
lines changed

14 files changed

+645
-10
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
114114
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
115115
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))
116116
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
117+
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
117118

118119
### Dependencies
119120
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Modifications Copyright OpenSearch Contributors. See
11+
* GitHub history for details.
12+
*/
13+
14+
package org.opensearch.cache.common.policy;
15+
16+
import org.opensearch.common.cache.policy.CachedQueryResult;
17+
import org.opensearch.common.unit.TimeValue;
18+
19+
import java.util.function.Function;
20+
import java.util.function.Predicate;
21+
22+
/**
23+
* A cache tier policy which accepts queries whose took time is greater than some threshold.
24+
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
25+
* The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed
26+
* to decide whether to admit the value.
27+
* @param <V> The type of data consumed by test().
28+
*/
29+
public class TookTimePolicy<V> implements Predicate<V> {
30+
/**
31+
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
32+
*/
33+
private final TimeValue threshold;
34+
35+
/**
36+
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
37+
*/
38+
private final Function<V, CachedQueryResult.PolicyValues> cachedResultParser;
39+
40+
/**
41+
* Constructs a took time policy.
42+
* @param threshold the threshold
43+
* @param cachedResultParser the function providing policy values
44+
*/
45+
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
46+
if (threshold.compareTo(TimeValue.ZERO) < 0) {
47+
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
48+
}
49+
this.threshold = threshold;
50+
this.cachedResultParser = cachedResultParser;
51+
}
52+
53+
/**
54+
* Check whether to admit data.
55+
* @param data the input argument
56+
* @return whether to admit the data
57+
*/
58+
public boolean test(V data) {
59+
long tookTimeNanos;
60+
try {
61+
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();
62+
} catch (Exception e) {
63+
// If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data
64+
return false;
65+
}
66+
67+
TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
68+
return tookTime.compareTo(threshold) >= 0;
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** A package for policies controlling what can enter caches. */
10+
package org.opensearch.cache.common.policy;

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,31 @@
88

99
package org.opensearch.cache.common.tier;
1010

11+
import org.opensearch.cache.common.policy.TookTimePolicy;
1112
import org.opensearch.common.annotation.ExperimentalApi;
1213
import org.opensearch.common.cache.CacheType;
1314
import org.opensearch.common.cache.ICache;
1415
import org.opensearch.common.cache.LoadAwareCacheLoader;
1516
import org.opensearch.common.cache.RemovalListener;
1617
import org.opensearch.common.cache.RemovalNotification;
18+
import org.opensearch.common.cache.policy.CachedQueryResult;
1719
import org.opensearch.common.cache.store.config.CacheConfig;
1820
import org.opensearch.common.settings.Setting;
1921
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.common.unit.TimeValue;
2023
import org.opensearch.common.util.concurrent.ReleasableLock;
2124
import org.opensearch.common.util.iterable.Iterables;
2225

2326
import java.io.IOException;
27+
import java.util.ArrayList;
2428
import java.util.Arrays;
2529
import java.util.List;
2630
import java.util.Map;
2731
import java.util.Objects;
2832
import java.util.concurrent.locks.ReadWriteLock;
2933
import java.util.concurrent.locks.ReentrantReadWriteLock;
3034
import java.util.function.Function;
35+
import java.util.function.Predicate;
3136

3237
/**
3338
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
@@ -52,6 +57,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
5257
* Maintains caching tiers in ascending order of cache latency.
5358
*/
5459
private final List<ICache<K, V>> cacheList;
60+
private final List<Predicate<V>> policies;
5561

5662
TieredSpilloverCache(Builder<K, V> builder) {
5763
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
@@ -63,21 +69,27 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
6369
@Override
6470
public void onRemoval(RemovalNotification<K, V> notification) {
6571
try (ReleasableLock ignore = writeLock.acquire()) {
66-
diskCache.put(notification.getKey(), notification.getValue());
72+
if (evaluatePolicies(notification.getValue())) {
73+
diskCache.put(notification.getKey(), notification.getValue());
74+
}
6775
}
6876
}
6977
})
7078
.setKeyType(builder.cacheConfig.getKeyType())
7179
.setValueType(builder.cacheConfig.getValueType())
7280
.setSettings(builder.cacheConfig.getSettings())
7381
.setWeigher(builder.cacheConfig.getWeigher())
82+
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix
83+
// coming soon
7484
.build(),
7585
builder.cacheType,
7686
builder.cacheFactories
7787

7888
);
7989
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
8090
this.cacheList = Arrays.asList(onHeapCache, diskCache);
91+
92+
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
8193
}
8294

8395
// Package private for testing
@@ -192,6 +204,15 @@ private Function<K, V> getValueFromTieredCache() {
192204
};
193205
}
194206

207+
boolean evaluatePolicies(V value) {
208+
for (Predicate<V> policy : policies) {
209+
if (!policy.test(value)) {
210+
return false;
211+
}
212+
}
213+
return true;
214+
}
215+
195216
/**
196217
* Factory to create TieredSpilloverCache objects.
197218
*/
@@ -231,11 +252,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
231252
);
232253
}
233254
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
255+
256+
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
257+
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
258+
.get(settings);
259+
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
260+
config.getCachedResultParser(),
261+
"Cached result parser fn can't be null"
262+
);
263+
234264
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
235265
.setOnHeapCacheFactory(onHeapCacheFactory)
236266
.setRemovalListener(config.getRemovalListener())
237267
.setCacheConfig(config)
238268
.setCacheType(cacheType)
269+
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
239270
.build();
240271
}
241272

@@ -257,6 +288,7 @@ public static class Builder<K, V> {
257288
private CacheConfig<K, V> cacheConfig;
258289
private CacheType cacheType;
259290
private Map<String, ICache.Factory> cacheFactories;
291+
private final ArrayList<Predicate<V>> policies = new ArrayList<>();
260292

261293
/**
262294
* Default constructor
@@ -323,6 +355,26 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
323355
return this;
324356
}
325357

358+
/**
359+
* Set a cache policy to be used to limit access to this cache's disk tier.
360+
* @param policy the policy
361+
* @return builder
362+
*/
363+
public Builder<K, V> addPolicy(Predicate<V> policy) {
364+
this.policies.add(policy);
365+
return this;
366+
}
367+
368+
/**
369+
* Set multiple policies to be used to limit access to this cache's disk tier.
370+
* @param policies the policies
371+
* @return builder
372+
*/
373+
public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
374+
this.policies.addAll(policies);
375+
return this;
376+
}
377+
326378
/**
327379
* Build tiered spillover cache.
328380
* @return TieredSpilloverCache

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java

+5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public List<Setting<?>> getSettings() {
5151
settingList.add(
5252
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
5353
);
54+
settingList.add(
55+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
56+
cacheType.getSettingPrefix()
57+
)
58+
);
5459
}
5560
return settingList;
5661
}

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java

+18
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.opensearch.cache.common.tier;
1010

1111
import org.opensearch.common.settings.Setting;
12+
import org.opensearch.common.unit.TimeValue;
13+
14+
import java.util.concurrent.TimeUnit;
1215

1316
import static org.opensearch.common.settings.Setting.Property.NodeScope;
1417

@@ -36,6 +39,21 @@ public class TieredSpilloverCacheSettings {
3639
(key) -> Setting.simpleString(key, "", NodeScope)
3740
);
3841

42+
/**
43+
* Setting defining the minimum took time for a query to be allowed into the disk cache.
44+
*/
45+
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
46+
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
47+
(key) -> Setting.timeSetting(
48+
key,
49+
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
50+
TimeValue.ZERO, // Minimum value for this setting
51+
NodeScope
52+
)
53+
);
54+
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
55+
// Will be tuned further with future benchmarks.
56+
3957
/**
4058
* Default constructor
4159
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cache.common.policy;
10+
11+
import org.apache.lucene.search.ScoreDoc;
12+
import org.apache.lucene.search.TopDocs;
13+
import org.apache.lucene.search.TotalHits;
14+
import org.opensearch.common.Randomness;
15+
import org.opensearch.common.cache.policy.CachedQueryResult;
16+
import org.opensearch.common.io.stream.BytesStreamOutput;
17+
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
18+
import org.opensearch.common.unit.TimeValue;
19+
import org.opensearch.core.common.bytes.BytesReference;
20+
import org.opensearch.core.common.io.stream.StreamOutput;
21+
import org.opensearch.search.DocValueFormat;
22+
import org.opensearch.search.query.QuerySearchResult;
23+
import org.opensearch.test.OpenSearchTestCase;
24+
25+
import java.io.IOException;
26+
import java.util.Random;
27+
import java.util.function.Function;
28+
29+
public class TookTimePolicyTests extends OpenSearchTestCase {
30+
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
31+
try {
32+
return CachedQueryResult.getPolicyValues(data);
33+
} catch (IOException e) {
34+
throw new RuntimeException(e);
35+
}
36+
};
37+
38+
private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
39+
return new TookTimePolicy<>(threshold, transformationFunction);
40+
}
41+
42+
public void testTookTimePolicy() throws Exception {
43+
double threshMillis = 10;
44+
long shortMillis = (long) (0.9 * threshMillis);
45+
long longMillis = (long) (1.5 * threshMillis);
46+
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis));
47+
BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000);
48+
BytesReference longTime = getValidPolicyInput(longMillis * 1000000);
49+
50+
boolean shortResult = tookTimePolicy.test(shortTime);
51+
assertFalse(shortResult);
52+
boolean longResult = tookTimePolicy.test(longTime);
53+
assertTrue(longResult);
54+
55+
TookTimePolicy<BytesReference> disabledPolicy = getTookTimePolicy(TimeValue.ZERO);
56+
shortResult = disabledPolicy.test(shortTime);
57+
assertTrue(shortResult);
58+
longResult = disabledPolicy.test(longTime);
59+
assertTrue(longResult);
60+
}
61+
62+
public void testNegativeOneInput() throws Exception {
63+
// PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason
64+
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(TimeValue.ZERO);
65+
BytesReference minusOne = getValidPolicyInput(-1L);
66+
assertFalse(tookTimePolicy.test(minusOne));
67+
}
68+
69+
public void testInvalidThreshold() throws Exception {
70+
assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE));
71+
}
72+
73+
private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException {
74+
// When it's used in the cache, the policy will receive BytesReferences which come from
75+
// serializing a CachedQueryResult.
76+
CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos);
77+
BytesStreamOutput out = new BytesStreamOutput();
78+
cachedQueryResult.writeToNoId(out);
79+
return out.bytes();
80+
}
81+
82+
private QuerySearchResult getQSR() {
83+
// We can't mock the QSR with mockito because the class is final. Construct a real one
84+
QuerySearchResult mockQSR = new QuerySearchResult();
85+
86+
// duplicated from DfsQueryPhaseTests.java
87+
mockQSR.topDocs(
88+
new TopDocsAndMaxScore(
89+
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }),
90+
2.0F
91+
),
92+
new DocValueFormat[0]
93+
);
94+
return mockQSR;
95+
}
96+
97+
private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException {
98+
Random rand = Randomness.get();
99+
byte[] bytes = new byte[numBytes];
100+
rand.nextBytes(bytes);
101+
out.writeBytes(bytes);
102+
}
103+
}

0 commit comments

Comments
 (0)