Skip to content

Commit 233e1c1

Browse files
github-actions[bot]Peter Alfonsi
and
Peter Alfonsi
committed
Move TSC took-time policy to guard both heap and disk tier (#17190)
* Move TSC took-time policy to guard both heap and disk tier Signed-off-by: Peter Alfonsi <petealft@amazon.com> * changelog Signed-off-by: Peter Alfonsi <petealft@amazon.com> * spotless apply Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Addressed Sagar's comment Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Add missing javadoc Signed-off-by: Peter Alfonsi <petealft@amazon.com> * address round 2 of comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Add removal notification to put() Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Fix incorrect stats hit when cache entry rejected by policy Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Fixed more broken stats Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * Addressed more comments Signed-off-by: Peter Alfonsi <petealft@amazon.com> * make policy rejections count as neither hit or miss Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * remove potential double-loading Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> * remove removalNotification Signed-off-by: Peter Alfonsi <petealft@amazon.com> * rerun gradle Signed-off-by: Peter Alfonsi <petealft@amazon.com> --------- Signed-off-by: Peter Alfonsi <petealft@amazon.com> Signed-off-by: Peter Alfonsi <peter.alfonsi@gmail.com> Co-authored-by: Peter Alfonsi <petealft@amazon.com> (cherry picked from commit b1e66b3) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 83c01f4 commit 233e1c1

File tree

9 files changed

+579
-208
lines changed

9 files changed

+579
-208
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
### Changed
2323
- Convert transport-reactor-netty4 to use gradle version catalog [#17233](https://github.com/opensearch-project/OpenSearch/pull/17233)
2424
- Increase force merge threads to 1/8th of cores [#17255](https://github.com/opensearch-project/OpenSearch/pull/17255)
25+
- TieredSpilloverCache took-time threshold now guards heap tier as well as disk tier [#17190](https://github.com/opensearch-project/OpenSearch/pull/17190)
2526

2627
### Deprecated
2728

modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheIT.java

+72-16
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
118118
);
119119
}
120120

121-
public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
121+
public void testWithDynamicDiskTookTimePolicyWithMultiSegments() throws Exception {
122122
int numberOfSegments = getNumberOfSegments();
123123
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
124124
// just a bit higher so that each segment can atleast hold 1 entry.
@@ -139,12 +139,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
139139
)
140140
.get()
141141
);
142-
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
142+
// Set a very high value for took time disk policy so that no items evicted from onHeap cache are spilled
143143
// to disk. And then hit requests so that few items are cached into cache.
144144
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
145145
Settings.builder()
146146
.put(
147-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
147+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
148+
.getKey(),
148149
new TimeValue(100, TimeUnit.SECONDS)
149150
)
150151
.build()
@@ -182,12 +183,13 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
182183
assertEquals(0, requestCacheStats.getHitCount());
183184
long lastEvictionSeen = requestCacheStats.getEvictions();
184185

185-
// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
186+
// Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able
186187
// to cache all entries.
187188
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
188189
Settings.builder()
189190
.put(
190-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
191+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
192+
.getKey(),
191193
new TimeValue(0, TimeUnit.MILLISECONDS)
192194
)
193195
.build()
@@ -206,7 +208,7 @@ public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
206208
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
207209
}
208210

209-
public void testWithDynamicTookTimePolicy() throws Exception {
211+
public void testWithDynamicHeapTookTimePolicy() throws Exception {
210212
int onHeapCacheSizeInBytes = 2000;
211213
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
212214
Client client = client();
@@ -224,8 +226,7 @@ public void testWithDynamicTookTimePolicy() throws Exception {
224226
)
225227
.get()
226228
);
227-
// Step 1 : Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
228-
// to disk. And then hit requests so that few items are cached into cache.
229+
// Set a high threshold for the overall cache took time policy so nothing will enter the cache.
229230
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
230231
Settings.builder()
231232
.put(
@@ -245,6 +246,57 @@ public void testWithDynamicTookTimePolicy() throws Exception {
245246
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
246247
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
247248
long perQuerySizeInCacheInBytes = -1;
249+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
250+
SearchResponse resp = client.prepareSearch("index")
251+
.setRequestCache(true)
252+
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
253+
.get();
254+
assertSearchResponse(resp);
255+
}
256+
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
257+
assertEquals(0, requestCacheStats.getEvictions());
258+
}
259+
260+
public void testWithDynamicDiskTookTimePolicy() throws Exception {
261+
int onHeapCacheSizeInBytes = 2000;
262+
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
263+
Client client = client();
264+
assertAcked(
265+
client.admin()
266+
.indices()
267+
.prepareCreate("index")
268+
.setMapping("k", "type=keyword")
269+
.setSettings(
270+
Settings.builder()
271+
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
272+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
273+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
274+
.put("index.refresh_interval", -1)
275+
)
276+
.get()
277+
);
278+
// Step 1 : Set a very high value for disk took time policy so that no items evicted from onHeap cache are spilled
279+
// to disk. And then hit requests so that few items are cached into cache.
280+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
281+
Settings.builder()
282+
.put(
283+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
284+
.getKey(),
285+
new TimeValue(100, TimeUnit.SECONDS)
286+
)
287+
.build()
288+
);
289+
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
290+
int numberOfIndexedItems = randomIntBetween(6, 10);
291+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
292+
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
293+
}
294+
ensureSearchable("index");
295+
refreshAndWaitForReplication();
296+
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
297+
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
298+
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
299+
long perQuerySizeInCacheInBytes = -1;
248300
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
249301
SearchResponse resp = client.prepareSearch("index")
250302
.setRequestCache(true)
@@ -282,12 +334,13 @@ public void testWithDynamicTookTimePolicy() throws Exception {
282334
assertEquals(0, requestCacheStats.getHitCount());
283335
long lastEvictionSeen = requestCacheStats.getEvictions();
284336

285-
// Step 3: Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
337+
// Step 3: Decrease disk took time policy to zero so that disk cache also comes into play. Now we should be able
286338
// to cache all entries.
287339
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
288340
Settings.builder()
289341
.put(
290-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
342+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
343+
.getKey(),
291344
new TimeValue(0, TimeUnit.MILLISECONDS)
292345
)
293346
.build()
@@ -352,11 +405,12 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
352405
)
353406
.get()
354407
);
355-
// Update took time policy to zero so that all entries are eligible to be cached on disk.
408+
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
356409
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
357410
Settings.builder()
358411
.put(
359-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
412+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
413+
.getKey(),
360414
new TimeValue(0, TimeUnit.MILLISECONDS)
361415
)
362416
.build()
@@ -437,11 +491,12 @@ public void testWithExplicitCacheClear() throws Exception {
437491
)
438492
.get()
439493
);
440-
// Update took time policy to zero so that all entries are eligible to be cached on disk.
494+
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
441495
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
442496
Settings.builder()
443497
.put(
444-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
498+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
499+
.getKey(),
445500
new TimeValue(0, TimeUnit.MILLISECONDS)
446501
)
447502
.build()
@@ -512,11 +567,12 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
512567
)
513568
.get()
514569
);
515-
// Update took time policy to zero so that all entries are eligible to be cached on disk.
570+
// Update disk took time policy to zero so that all entries are eligible to be cached on disk.
516571
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
517572
Settings.builder()
518573
.put(
519-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
574+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
575+
.getKey(),
520576
new TimeValue(0, TimeUnit.MILLISECONDS)
521577
)
522578
.build()

modules/cache-common/src/internalClusterTest/java/org/opensearch/cache/common/tier/TieredSpilloverCacheStatsIT.java

+34-51
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
6262
* Test aggregating by indices
6363
*/
6464
public void testIndicesLevelAggregation() throws Exception {
65-
internalCluster().startNodes(
66-
1,
67-
Settings.builder()
68-
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
69-
.put(
70-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
71-
new TimeValue(0, TimeUnit.SECONDS)
72-
)
73-
.build()
74-
);
65+
startNodesDefaultSettings();
7566
Client client = client();
7667
Map<String, Integer> values = setupCacheForAggregationTests(client);
7768

@@ -115,16 +106,7 @@ public void testIndicesLevelAggregation() throws Exception {
115106
* Test aggregating by indices and tier
116107
*/
117108
public void testIndicesAndTierLevelAggregation() throws Exception {
118-
internalCluster().startNodes(
119-
1,
120-
Settings.builder()
121-
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
122-
.put(
123-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
124-
new TimeValue(0, TimeUnit.SECONDS)
125-
)
126-
.build()
127-
);
109+
startNodesDefaultSettings();
128110
Client client = client();
129111
Map<String, Integer> values = setupCacheForAggregationTests(client);
130112

@@ -195,16 +177,7 @@ public void testIndicesAndTierLevelAggregation() throws Exception {
195177
* Test aggregating by tier only
196178
*/
197179
public void testTierLevelAggregation() throws Exception {
198-
internalCluster().startNodes(
199-
1,
200-
Settings.builder()
201-
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
202-
.put(
203-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
204-
new TimeValue(0, TimeUnit.SECONDS)
205-
)
206-
.build()
207-
);
180+
startNodesDefaultSettings();
208181
Client client = client();
209182
Map<String, Integer> values = setupCacheForAggregationTests(client);
210183
// Get values for tiers alone and check they add correctly across indices
@@ -236,16 +209,7 @@ public void testTierLevelAggregation() throws Exception {
236209
}
237210

238211
public void testInvalidLevelsAreIgnored() throws Exception {
239-
internalCluster().startNodes(
240-
1,
241-
Settings.builder()
242-
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
243-
.put(
244-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
245-
new TimeValue(0, TimeUnit.SECONDS)
246-
)
247-
.build()
248-
);
212+
startNodesDefaultSettings();
249213
Client client = client();
250214
Map<String, Integer> values = setupCacheForAggregationTests(client);
251215

@@ -287,16 +251,7 @@ public void testInvalidLevelsAreIgnored() throws Exception {
287251
* Check the new stats API returns the same values as the old stats API.
288252
*/
289253
public void testStatsMatchOldApi() throws Exception {
290-
internalCluster().startNodes(
291-
1,
292-
Settings.builder()
293-
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, getNumberOfSegments()))
294-
.put(
295-
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
296-
new TimeValue(0, TimeUnit.SECONDS)
297-
)
298-
.build()
299-
);
254+
startNodesDefaultSettings();
300255
String index = "index";
301256
Client client = client();
302257
startIndex(client, index);
@@ -354,7 +309,12 @@ public void testStatsWithMultipleSegments() throws Exception {
354309
.put(defaultSettings(heap_cache_size_per_segment * numberOfSegments + "B", numberOfSegments))
355310
.put(
356311
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
357-
new TimeValue(0, TimeUnit.SECONDS)
312+
TimeValue.ZERO
313+
)
314+
.put(
315+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
316+
.getKey(),
317+
TimeValue.ZERO
358318
)
359319
.build()
360320
);
@@ -429,6 +389,11 @@ public void testClosingShard() throws Exception {
429389
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
430390
new TimeValue(0, TimeUnit.SECONDS)
431391
)
392+
.put(
393+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
394+
.getKey(),
395+
new TimeValue(0, TimeUnit.SECONDS)
396+
)
432397
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
433398
.build()
434399
);
@@ -631,4 +596,22 @@ private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client,
631596
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();
632597
return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE);
633598
}
599+
600+
private void startNodesDefaultSettings() {
601+
internalCluster().startNodes(
602+
1,
603+
Settings.builder()
604+
.put(defaultSettings(HEAP_CACHE_SIZE_STRING, 1))
605+
.put(
606+
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
607+
TimeValue.ZERO
608+
)
609+
.put(
610+
TieredSpilloverCacheSettings.TOOK_TIME_DISK_TIER_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE)
611+
.getKey(),
612+
TimeValue.ZERO
613+
)
614+
.build()
615+
);
616+
}
634617
}

modules/cache-common/src/main/java/org/opensearch/cache/common/policy/TookTimePolicy.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,14 @@
1313

1414
package org.opensearch.cache.common.policy;
1515

16-
import org.opensearch.common.cache.CacheType;
1716
import org.opensearch.common.cache.policy.CachedQueryResult;
1817
import org.opensearch.common.settings.ClusterSettings;
18+
import org.opensearch.common.settings.Setting;
1919
import org.opensearch.common.unit.TimeValue;
2020

2121
import java.util.function.Function;
2222
import java.util.function.Predicate;
2323

24-
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
25-
2624
/**
2725
* A cache tier policy which accepts queries whose took time is greater than some threshold.
2826
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
@@ -46,20 +44,20 @@ public class TookTimePolicy<V> implements Predicate<V> {
4644
* @param threshold the threshold
4745
* @param cachedResultParser the function providing policy values
4846
* @param clusterSettings cluster settings
49-
* @param cacheType cache type
47+
* @param targetSetting the cluster setting to register a consumer with
5048
*/
5149
public TookTimePolicy(
5250
TimeValue threshold,
5351
Function<V, CachedQueryResult.PolicyValues> cachedResultParser,
5452
ClusterSettings clusterSettings,
55-
CacheType cacheType
53+
Setting<TimeValue> targetSetting
5654
) {
5755
if (threshold.compareTo(TimeValue.ZERO) < 0) {
5856
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
5957
}
6058
this.threshold = threshold;
6159
this.cachedResultParser = cachedResultParser;
62-
clusterSettings.addSettingsUpdateConsumer(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType), this::setThreshold);
60+
clusterSettings.addSettingsUpdateConsumer(targetSetting, this::setThreshold);
6361
}
6462

6563
private void setThreshold(TimeValue threshold) {
@@ -72,6 +70,10 @@ private void setThreshold(TimeValue threshold) {
7270
* @return whether to admit the data
7371
*/
7472
public boolean test(V data) {
73+
if (threshold.equals(TimeValue.ZERO)) {
74+
// Skip parsing the took time if this threshold is zero.
75+
return true;
76+
}
7577
long tookTimeNanos;
7678
try {
7779
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();

0 commit comments

Comments
 (0)