Skip to content

Commit dc29e17

Browse files
authored
[Tiered caching] Integrating IndicesRequestCache with CacheService controlled by a feature flag (#12533)
* Adding changelog * Fixing gradle build issue * Fixing CacheService test * Adding UT in IndicesRequestCache with feature flag for more coverage * Updating changelog and renaming feature flag setting * Moving feature flag setting handling logic to CacheService by maintaining backward compatibility * Fixing broken UTs --------- Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
1 parent 127f497 commit dc29e17

File tree

22 files changed

+665
-176
lines changed

22 files changed

+665
-176
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
104104
- Add toString methods to MultiSearchRequest, MultiGetRequest and CreateIndexRequest ([#12163](https://github.com/opensearch-project/OpenSearch/pull/12163))
105105
- Support for returning scores in matched queries ([#11626](https://github.com/opensearch-project/OpenSearch/pull/11626))
106106
- Add shard id property to SearchLookup for use in field types provided by plugins ([#1063](https://github.com/opensearch-project/OpenSearch/pull/1063))
107+
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
107108
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
108109

109110
### Dependencies

distribution/src/config/opensearch.yml

+4
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,7 @@ ${path.logs}
121121
# Once there is no observed impact on performance, this feature flag can be removed.
122122
#
123123
#opensearch.experimental.optimization.datetime_formatter_caching.enabled: false
124+
#
125+
# Gates the functionality of enabling Opensearch to use pluggable caches with respective store names via setting.
126+
#
127+
#opensearch.experimental.feature.pluggable.caching.enabled: false

modules/cache-common/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
* compatible open source license.
77
*/
88

9+
apply plugin: 'opensearch.internal-cluster-test'
10+
911
opensearchplugin {
1012
description 'Module for caches which are optional and do not require additional security permission'
1113
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cache.common.tier;
10+
11+
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
12+
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
13+
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
14+
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
15+
import org.opensearch.action.search.SearchResponse;
16+
import org.opensearch.action.search.SearchType;
17+
import org.opensearch.client.Client;
18+
import org.opensearch.common.cache.CacheType;
19+
import org.opensearch.common.cache.ICache;
20+
import org.opensearch.common.cache.settings.CacheSettings;
21+
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
22+
import org.opensearch.common.settings.Settings;
23+
import org.opensearch.common.util.FeatureFlags;
24+
import org.opensearch.indices.IndicesRequestCache;
25+
import org.opensearch.plugins.CachePlugin;
26+
import org.opensearch.plugins.Plugin;
27+
import org.opensearch.plugins.PluginInfo;
28+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
29+
import org.opensearch.test.OpenSearchIntegTestCase;
30+
import org.junit.Assert;
31+
32+
import java.time.ZoneId;
33+
import java.util.Arrays;
34+
import java.util.Collection;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.function.Function;
38+
import java.util.stream.Collectors;
39+
import java.util.stream.Stream;
40+
41+
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
42+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
43+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
44+
import static org.hamcrest.Matchers.greaterThan;
45+
46+
public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {
47+
48+
@Override
49+
protected Collection<Class<? extends Plugin>> nodePlugins() {
50+
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
51+
}
52+
53+
@Override
54+
protected Settings featureFlagSettings() {
55+
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build();
56+
}
57+
58+
@Override
59+
protected Settings nodeSettings(int nodeOrdinal) {
60+
return Settings.builder()
61+
.put(super.nodeSettings(nodeOrdinal))
62+
.put(
63+
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
64+
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
65+
)
66+
.put(
67+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
68+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
69+
).getKey(),
70+
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
71+
)
72+
.put(
73+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
74+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
75+
).getKey(),
76+
MockDiskCache.MockDiskCacheFactory.NAME
77+
)
78+
.build();
79+
}
80+
81+
public void testPluginsAreInstalled() {
82+
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
83+
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
84+
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
85+
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
86+
.stream()
87+
.flatMap(
88+
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
89+
)
90+
.collect(Collectors.toList());
91+
Assert.assertTrue(
92+
pluginInfos.stream()
93+
.anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.cache.common" + ".tier.TieredSpilloverCachePlugin"))
94+
);
95+
}
96+
97+
public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
98+
Client client = client();
99+
assertAcked(
100+
client.admin()
101+
.indices()
102+
.prepareCreate("index")
103+
.setMapping("f", "type=date")
104+
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true).build())
105+
.get()
106+
);
107+
indexRandom(
108+
true,
109+
client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"),
110+
client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z")
111+
);
112+
ensureSearchable("index");
113+
114+
// This is not a random example: serialization with time zones writes shared strings
115+
// which used to not work well with the query cache because of the handles stream output
116+
// see #9500
117+
final SearchResponse r1 = client.prepareSearch("index")
118+
.setSize(0)
119+
.setSearchType(SearchType.QUERY_THEN_FETCH)
120+
.addAggregation(
121+
dateHistogram("histo").field("f")
122+
.timeZone(ZoneId.of("+01:00"))
123+
.minDocCount(0)
124+
.dateHistogramInterval(DateHistogramInterval.MONTH)
125+
)
126+
.get();
127+
assertSearchResponse(r1);
128+
129+
// The cached is actually used
130+
assertThat(
131+
client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(),
132+
greaterThan(0L)
133+
);
134+
}
135+
136+
public static class MockDiskCachePlugin extends Plugin implements CachePlugin {
137+
138+
public MockDiskCachePlugin() {}
139+
140+
@Override
141+
public Map<String, ICache.Factory> getCacheFactoryMap() {
142+
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000));
143+
}
144+
145+
@Override
146+
public String getName() {
147+
return "mock_disk_plugin";
148+
}
149+
}
150+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cache.common.tier;
10+
11+
import org.opensearch.common.cache.CacheType;
12+
import org.opensearch.common.cache.ICache;
13+
import org.opensearch.common.cache.LoadAwareCacheLoader;
14+
import org.opensearch.common.cache.store.builders.ICacheBuilder;
15+
import org.opensearch.common.cache.store.config.CacheConfig;
16+
17+
import java.util.Map;
18+
import java.util.concurrent.ConcurrentHashMap;
19+
20+
public class MockDiskCache<K, V> implements ICache<K, V> {
21+
22+
Map<K, V> cache;
23+
int maxSize;
24+
long delay;
25+
26+
public MockDiskCache(int maxSize, long delay) {
27+
this.maxSize = maxSize;
28+
this.delay = delay;
29+
this.cache = new ConcurrentHashMap<K, V>();
30+
}
31+
32+
@Override
33+
public V get(K key) {
34+
V value = cache.get(key);
35+
return value;
36+
}
37+
38+
@Override
39+
public void put(K key, V value) {
40+
if (this.cache.size() >= maxSize) { // For simplification
41+
return;
42+
}
43+
try {
44+
Thread.sleep(delay);
45+
} catch (InterruptedException e) {
46+
throw new RuntimeException(e);
47+
}
48+
this.cache.put(key, value);
49+
}
50+
51+
@Override
52+
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) {
53+
V value = cache.computeIfAbsent(key, key1 -> {
54+
try {
55+
return loader.load(key);
56+
} catch (Exception e) {
57+
throw new RuntimeException(e);
58+
}
59+
});
60+
return value;
61+
}
62+
63+
@Override
64+
public void invalidate(K key) {
65+
this.cache.remove(key);
66+
}
67+
68+
@Override
69+
public void invalidateAll() {
70+
this.cache.clear();
71+
}
72+
73+
@Override
74+
public Iterable<K> keys() {
75+
return this.cache.keySet();
76+
}
77+
78+
@Override
79+
public long count() {
80+
return this.cache.size();
81+
}
82+
83+
@Override
84+
public void refresh() {}
85+
86+
@Override
87+
public void close() {
88+
89+
}
90+
91+
public static class MockDiskCacheFactory implements Factory {
92+
93+
public static final String NAME = "mockDiskCache";
94+
final long delay;
95+
final int maxSize;
96+
97+
public MockDiskCacheFactory(long delay, int maxSize) {
98+
this.delay = delay;
99+
this.maxSize = maxSize;
100+
}
101+
102+
@Override
103+
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
104+
return new Builder<K, V>().setMaxSize(maxSize).setDeliberateDelay(delay).build();
105+
}
106+
107+
@Override
108+
public String getCacheName() {
109+
return NAME;
110+
}
111+
}
112+
113+
public static class Builder<K, V> extends ICacheBuilder<K, V> {
114+
115+
int maxSize;
116+
long delay;
117+
118+
@Override
119+
public ICache<K, V> build() {
120+
return new MockDiskCache<K, V>(this.maxSize, this.delay);
121+
}
122+
123+
public Builder<K, V> setMaxSize(int maxSize) {
124+
this.maxSize = maxSize;
125+
return this;
126+
}
127+
128+
public Builder<K, V> setDeliberateDelay(long millis) {
129+
this.delay = millis;
130+
return this;
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)