Skip to content

Commit 2b851a1

Browse files
authored
[Tiered Caching] Make took time policy dynamic and add additional integ tests (opensearch-project#13063) (opensearch-project#13175)
--------- Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
1 parent 93d7afb commit 2b851a1

File tree

14 files changed

+600
-54
lines changed

14 files changed

+600
-54
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1111
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
1212
- Add a counter to node stat (and _cat/shards) api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
1313
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
14+
- [Tiered Caching] Make took time caching policy setting dynamic ([#13063](https://github.com/opensearch-project/OpenSearch/pull/13063))
1415
- Detect breaking changes on pull requests ([#9044](https://github.com/opensearch-project/OpenSearch/pull/9044))
1516
- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656))
1617
- Derived fields support to derive field values at query time without indexing ([#12569](https://github.com/opensearch-project/OpenSearch/pull/12569))

modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java

+309-13
Large diffs are not rendered by default.

modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313

1414
package org.opensearch.cache.common.policy;
1515

16+
import org.opensearch.common.cache.CacheType;
1617
import org.opensearch.common.cache.policy.CachedQueryResult;
18+
import org.opensearch.common.settings.ClusterSettings;
1719
import org.opensearch.common.unit.TimeValue;
1820

1921
import java.util.function.Function;
2022
import java.util.function.Predicate;
2123

24+
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
25+
2226
/**
2327
* A cache tier policy which accepts queries whose took time is greater than some threshold.
2428
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
@@ -30,7 +34,7 @@ public class TookTimePolicy<V> implements Predicate<V> {
3034
/**
3135
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
3236
*/
33-
private final TimeValue threshold;
37+
private TimeValue threshold;
3438

3539
/**
3640
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
@@ -41,13 +45,25 @@ public class TookTimePolicy<V> implements Predicate<V> {
4145
* Constructs a took time policy.
4246
* @param threshold the threshold
4347
* @param cachedResultParser the function providing policy values
48+
* @param clusterSettings cluster settings
49+
* @param cacheType cache type
4450
*/
45-
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
51+
public TookTimePolicy(
52+
TimeValue threshold,
53+
Function<V, CachedQueryResult.PolicyValues> cachedResultParser,
54+
ClusterSettings clusterSettings,
55+
CacheType cacheType
56+
) {
4657
if (threshold.compareTo(TimeValue.ZERO) < 0) {
4758
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
4859
}
4960
this.threshold = threshold;
5061
this.cachedResultParser = cachedResultParser;
62+
clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold);
63+
}
64+
65+
private void setThreshold(TimeValue threshold) {
66+
this.threshold = threshold;
5167
}
5268

5369
/**

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

+77-7
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,21 @@
1515
import org.opensearch.common.cache.LoadAwareCacheLoader;
1616
import org.opensearch.common.cache.RemovalListener;
1717
import org.opensearch.common.cache.RemovalNotification;
18+
import org.opensearch.common.cache.RemovalReason;
1819
import org.opensearch.common.cache.policy.CachedQueryResult;
1920
import org.opensearch.common.cache.store.config.CacheConfig;
2021
import org.opensearch.common.settings.Setting;
2122
import org.opensearch.common.settings.Settings;
2223
import org.opensearch.common.unit.TimeValue;
2324
import org.opensearch.common.util.concurrent.ReleasableLock;
24-
import org.opensearch.common.util.iterable.Iterables;
2525

2626
import java.io.IOException;
2727
import java.util.ArrayList;
2828
import java.util.Arrays;
29+
import java.util.Iterator;
2930
import java.util.List;
3031
import java.util.Map;
32+
import java.util.NoSuchElementException;
3133
import java.util.Objects;
3234
import java.util.concurrent.locks.ReadWriteLock;
3335
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,6 +49,9 @@
4749
@ExperimentalApi
4850
public class TieredSpilloverCache<K, V> implements ICache<K, V> {
4951

52+
// Used to avoid caching stale entries in lower tiers.
53+
private static final List<RemovalReason> SPILLOVER_REMOVAL_REASONS = List.of(RemovalReason.EVICTED, RemovalReason.CAPACITY);
54+
5055
private final ICache<K, V> diskCache;
5156
private final ICache<K, V> onHeapCache;
5257
private final RemovalListener<K, V> removalListener;
@@ -69,8 +74,11 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
6974
@Override
7075
public void onRemoval(RemovalNotification<K, V> notification) {
7176
try (ReleasableLock ignore = writeLock.acquire()) {
72-
if (evaluatePolicies(notification.getValue())) {
77+
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
78+
&& evaluatePolicies(notification.getValue())) {
7379
diskCache.put(notification.getKey(), notification.getValue());
80+
} else {
81+
removalListener.onRemoval(notification);
7482
}
7583
}
7684
}
@@ -81,6 +89,7 @@ public void onRemoval(RemovalNotification<K, V> notification) {
8189
.setWeigher(builder.cacheConfig.getWeigher())
8290
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes())
8391
.setExpireAfterAccess(builder.cacheConfig.getExpireAfterAccess())
92+
.setClusterSettings(builder.cacheConfig.getClusterSettings())
8493
.build(),
8594
builder.cacheType,
8695
builder.cacheFactories
@@ -156,10 +165,11 @@ public void invalidateAll() {
156165
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
157166
* @return An iterable over (onHeap + disk) keys
158167
*/
159-
@SuppressWarnings("unchecked")
168+
@SuppressWarnings({ "unchecked" })
160169
@Override
161170
public Iterable<K> keys() {
162-
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
171+
Iterable<K>[] iterables = (Iterable<K>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
172+
return new ConcatenatedIterables<K>(iterables);
163173
}
164174

165175
@Override
@@ -213,6 +223,67 @@ boolean evaluatePolicies(V value) {
213223
return true;
214224
}
215225

226+
/**
227+
* ConcatenatedIterables which combines cache iterables and supports remove() functionality as well if underlying
228+
* iterator supports it.
229+
* @param <K> Type of key.
230+
*/
231+
static class ConcatenatedIterables<K> implements Iterable<K> {
232+
233+
final Iterable<K>[] iterables;
234+
235+
ConcatenatedIterables(Iterable<K>[] iterables) {
236+
this.iterables = iterables;
237+
}
238+
239+
@SuppressWarnings({ "unchecked" })
240+
@Override
241+
public Iterator<K> iterator() {
242+
Iterator<K>[] iterators = (Iterator<K>[]) new Iterator<?>[iterables.length];
243+
for (int i = 0; i < iterables.length; i++) {
244+
iterators[i] = iterables[i].iterator();
245+
}
246+
return new ConcatenatedIterator<>(iterators);
247+
}
248+
249+
static class ConcatenatedIterator<T> implements Iterator<T> {
250+
private final Iterator<T>[] iterators;
251+
private int currentIteratorIndex;
252+
private Iterator<T> currentIterator;
253+
254+
public ConcatenatedIterator(Iterator<T>[] iterators) {
255+
this.iterators = iterators;
256+
this.currentIteratorIndex = 0;
257+
this.currentIterator = iterators[currentIteratorIndex];
258+
}
259+
260+
@Override
261+
public boolean hasNext() {
262+
while (!currentIterator.hasNext()) {
263+
currentIteratorIndex++;
264+
if (currentIteratorIndex == iterators.length) {
265+
return false;
266+
}
267+
currentIterator = iterators[currentIteratorIndex];
268+
}
269+
return true;
270+
}
271+
272+
@Override
273+
public T next() {
274+
if (!hasNext()) {
275+
throw new NoSuchElementException();
276+
}
277+
return currentIterator.next();
278+
}
279+
280+
@Override
281+
public void remove() {
282+
currentIterator.remove();
283+
}
284+
}
285+
}
286+
216287
/**
217288
* Factory to create TieredSpilloverCache objects.
218289
*/
@@ -253,8 +324,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
253324
}
254325
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
255326

256-
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
257-
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
327+
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType)
258328
.get(settings);
259329
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
260330
config.getCachedResultParser(),
@@ -266,7 +336,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
266336
.setRemovalListener(config.getRemovalListener())
267337
.setCacheConfig(config)
268338
.setCacheType(cacheType)
269-
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
339+
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType))
270340
.build();
271341
}
272342

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.util.List;
1919
import java.util.Map;
2020

21+
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
22+
2123
/**
2224
* Plugin for TieredSpilloverCache.
2325
*/
@@ -51,11 +53,7 @@ public List<Setting<?>> getSettings() {
5153
settingList.add(
5254
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
5355
);
54-
settingList.add(
55-
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
56-
cacheType.getSettingPrefix()
57-
)
58-
);
56+
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
5957
}
6058
return settingList;
6159
}

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@
88

99
package org.opensearch.cache.common.tier;
1010

11+
import org.opensearch.common.cache.CacheType;
1112
import org.opensearch.common.settings.Setting;
1213
import org.opensearch.common.unit.TimeValue;
1314

15+
import java.util.HashMap;
16+
import java.util.Map;
1417
import java.util.concurrent.TimeUnit;
1518

1619
import static org.opensearch.common.settings.Setting.Property.NodeScope;
@@ -42,17 +45,36 @@ public class TieredSpilloverCacheSettings {
4245
/**
4346
* Setting defining the minimum took time for a query to be allowed into the disk cache.
4447
*/
45-
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
48+
private static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
4649
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
4750
(key) -> Setting.timeSetting(
4851
key,
4952
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
5053
TimeValue.ZERO, // Minimum value for this setting
51-
NodeScope
54+
NodeScope,
55+
Setting.Property.Dynamic
5256
)
5357
);
54-
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
55-
// Will be tuned further with future benchmarks.
58+
59+
/**
60+
* Stores took time policy settings for various cache types as these are dynamic so that can be registered and
61+
* retrieved accordingly.
62+
*/
63+
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
64+
65+
/**
66+
* Fetches concrete took time policy settings.
67+
*/
68+
static {
69+
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
70+
for (CacheType cacheType : CacheType.values()) {
71+
concreteTookTimePolicySettingMap.put(
72+
cacheType,
73+
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
74+
);
75+
}
76+
TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap;
77+
}
5678

5779
/**
5880
* Default constructor

modules/cache-common/src/test/java/org/opensearch/cache/common/policy/TookTimePolicyTests.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,27 @@
1212
import org.apache.lucene.search.TopDocs;
1313
import org.apache.lucene.search.TotalHits;
1414
import org.opensearch.common.Randomness;
15+
import org.opensearch.common.cache.CacheType;
1516
import org.opensearch.common.cache.policy.CachedQueryResult;
1617
import org.opensearch.common.io.stream.BytesStreamOutput;
1718
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
19+
import org.opensearch.common.settings.ClusterSettings;
20+
import org.opensearch.common.settings.Settings;
1821
import org.opensearch.common.unit.TimeValue;
1922
import org.opensearch.core.common.bytes.BytesReference;
2023
import org.opensearch.core.common.io.stream.StreamOutput;
2124
import org.opensearch.search.DocValueFormat;
2225
import org.opensearch.search.query.QuerySearchResult;
2326
import org.opensearch.test.OpenSearchTestCase;
27+
import org.junit.Before;
2428

2529
import java.io.IOException;
30+
import java.util.HashSet;
2631
import java.util.Random;
2732
import java.util.function.Function;
2833

34+
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
35+
2936
public class TookTimePolicyTests extends OpenSearchTestCase {
3037
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
3138
try {
@@ -35,8 +42,17 @@ public class TookTimePolicyTests extends OpenSearchTestCase {
3542
}
3643
};
3744

45+
private ClusterSettings clusterSettings;
46+
47+
@Before
48+
public void setup() {
49+
Settings settings = Settings.EMPTY;
50+
clusterSettings = new ClusterSettings(settings, new HashSet<>());
51+
clusterSettings.registerSetting(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE));
52+
}
53+
3854
private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
39-
return new TookTimePolicy<>(threshold, transformationFunction);
55+
return new TookTimePolicy<>(threshold, transformationFunction, clusterSettings, CacheType.INDICES_REQUEST_CACHE);
4056
}
4157

4258
public void testTookTimePolicy() throws Exception {

0 commit comments

Comments
 (0)