8
8
9
9
package org .opensearch .cache ;
10
10
11
+ import com .carrotsearch .randomizedtesting .annotations .ThreadLeakFilters ;
12
+
11
13
import org .opensearch .action .admin .cluster .node .info .NodeInfo ;
12
14
import org .opensearch .action .admin .cluster .node .info .NodesInfoRequest ;
13
15
import org .opensearch .action .admin .cluster .node .info .NodesInfoResponse ;
14
16
import org .opensearch .action .admin .cluster .node .info .PluginsAndModules ;
17
+ import org .opensearch .action .admin .indices .cache .clear .ClearIndicesCacheRequest ;
18
+ import org .opensearch .action .admin .indices .cache .clear .ClearIndicesCacheResponse ;
19
+ import org .opensearch .action .admin .indices .forcemerge .ForceMergeResponse ;
20
+ import org .opensearch .action .search .SearchResponse ;
21
+ import org .opensearch .action .search .SearchType ;
22
+ import org .opensearch .cache .store .disk .EhcacheDiskCache ;
23
+ import org .opensearch .cache .store .disk .EhcacheThreadLeakFilter ;
24
+ import org .opensearch .client .Client ;
25
+ import org .opensearch .cluster .metadata .IndexMetadata ;
26
+ import org .opensearch .common .cache .CacheType ;
27
+ import org .opensearch .common .cache .settings .CacheSettings ;
28
+ import org .opensearch .common .settings .Settings ;
29
+ import org .opensearch .common .unit .TimeValue ;
30
+ import org .opensearch .common .util .FeatureFlags ;
31
+ import org .opensearch .env .NodeEnvironment ;
32
+ import org .opensearch .index .cache .request .RequestCacheStats ;
33
+ import org .opensearch .index .query .QueryBuilders ;
34
+ import org .opensearch .indices .IndicesRequestCache ;
15
35
import org .opensearch .plugins .Plugin ;
16
36
import org .opensearch .plugins .PluginInfo ;
37
+ import org .opensearch .search .aggregations .bucket .histogram .DateHistogramInterval ;
17
38
import org .opensearch .test .OpenSearchIntegTestCase ;
39
+ import org .opensearch .test .hamcrest .OpenSearchAssertions ;
18
40
import org .junit .Assert ;
19
41
42
+ import java .io .IOException ;
43
+ import java .time .ZoneId ;
20
44
import java .util .Arrays ;
21
45
import java .util .Collection ;
22
46
import java .util .List ;
47
+ import java .util .UUID ;
48
+ import java .util .concurrent .TimeUnit ;
23
49
import java .util .function .Function ;
24
50
import java .util .stream .Collectors ;
25
51
import java .util .stream .Stream ;
26
52
53
+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DEFAULT_CACHE_SIZE_IN_BYTES ;
54
+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY ;
55
+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_LISTENER_MODE_SYNC_KEY ;
56
+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_MAX_SIZE_IN_BYTES_KEY ;
57
+ import static org .opensearch .cache .EhcacheDiskCacheSettings .DISK_STORAGE_PATH_KEY ;
58
+ import static org .opensearch .indices .IndicesService .INDICES_CACHE_CLEAN_INTERVAL_SETTING ;
59
+ import static org .opensearch .search .aggregations .AggregationBuilders .dateHistogram ;
60
+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
61
+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertNoFailures ;
62
+ import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertSearchResponse ;
63
+ import static org .hamcrest .Matchers .greaterThan ;
64
+
65
+ @ OpenSearchIntegTestCase .ClusterScope (numDataNodes = 0 , scope = OpenSearchIntegTestCase .Scope .TEST )
66
+ @ ThreadLeakFilters (filters = { EhcacheThreadLeakFilter .class })
27
67
public class EhcacheDiskCacheIT extends OpenSearchIntegTestCase {
28
68
29
69
@ Override
30
70
protected Collection <Class <? extends Plugin >> nodePlugins () {
31
71
return Arrays .asList (EhcacheCachePlugin .class );
32
72
}
33
73
74
+ @ Override
75
+ protected Settings featureFlagSettings () {
76
+ return Settings .builder ().put (super .featureFlagSettings ()).put (FeatureFlags .PLUGGABLE_CACHE , "true" ).build ();
77
+ }
78
+
79
+ private Settings defaultSettings (long sizeInBytes , TimeValue expirationTime ) {
80
+ if (expirationTime == null ) {
81
+ expirationTime = TimeValue .MAX_VALUE ;
82
+ }
83
+ try (NodeEnvironment env = newNodeEnvironment (Settings .EMPTY )) {
84
+ return Settings .builder ()
85
+ .put (
86
+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
87
+ .get (DISK_STORAGE_PATH_KEY )
88
+ .getKey (),
89
+ env .nodePaths ()[0 ].indicesPath .toString () + "/" + UUID .randomUUID () + "/request_cache/"
90
+ )
91
+ .put (
92
+ CacheSettings .getConcreteStoreNameSettingForCacheType (CacheType .INDICES_REQUEST_CACHE ).getKey (),
93
+ EhcacheDiskCache .EhcacheDiskCacheFactory .EHCACHE_DISK_CACHE_NAME
94
+ )
95
+ .put (
96
+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
97
+ .get (DISK_LISTENER_MODE_SYNC_KEY )
98
+ .getKey (),
99
+ true
100
+ )
101
+ .put (
102
+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
103
+ .get (DISK_MAX_SIZE_IN_BYTES_KEY )
104
+ .getKey (),
105
+ sizeInBytes
106
+ )
107
+ .put (
108
+ EhcacheDiskCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
109
+ .get (DISK_CACHE_EXPIRE_AFTER_ACCESS_KEY )
110
+ .getKey (),
111
+ expirationTime
112
+ )
113
+ .build ();
114
+ } catch (IOException e ) {
115
+ throw new RuntimeException (e );
116
+ }
117
+ }
118
+
34
119
public void testPluginsAreInstalled () {
120
+ internalCluster ().startNode (Settings .builder ().put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null )).build ());
35
121
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest ();
36
122
nodesInfoRequest .addMetric (NodesInfoRequest .Metric .PLUGINS .metricName ());
37
123
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase .client ().admin ().cluster ().nodesInfo (nodesInfoRequest ).actionGet ();
@@ -45,4 +131,258 @@ public void testPluginsAreInstalled() {
45
131
pluginInfos .stream ().anyMatch (pluginInfo -> pluginInfo .getName ().equals ("org.opensearch.cache.EhcacheCachePlugin" ))
46
132
);
47
133
}
134
+
135
+ public void testSanityChecksWithIndicesRequestCache () throws InterruptedException {
136
+ internalCluster ().startNode (Settings .builder ().put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null )).build ());
137
+ Client client = client ();
138
+ assertAcked (
139
+ client .admin ()
140
+ .indices ()
141
+ .prepareCreate ("index" )
142
+ .setMapping ("f" , "type=date" )
143
+ .setSettings (
144
+ Settings .builder ()
145
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
146
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
147
+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
148
+ .build ()
149
+ )
150
+ .get ()
151
+ );
152
+ indexRandom (
153
+ true ,
154
+ client .prepareIndex ("index" ).setSource ("f" , "2014-03-10T00:00:00.000Z" ),
155
+ client .prepareIndex ("index" ).setSource ("f" , "2014-05-13T00:00:00.000Z" )
156
+ );
157
+ ensureSearchable ("index" );
158
+
159
+ // This is not a random example: serialization with time zones writes shared strings
160
+ // which used to not work well with the query cache because of the handles stream output
161
+ // see #9500
162
+ final SearchResponse r1 = client .prepareSearch ("index" )
163
+ .setSize (0 )
164
+ .setSearchType (SearchType .QUERY_THEN_FETCH )
165
+ .addAggregation (
166
+ dateHistogram ("histo" ).field ("f" )
167
+ .timeZone (ZoneId .of ("+01:00" ))
168
+ .minDocCount (0 )
169
+ .dateHistogramInterval (DateHistogramInterval .MONTH )
170
+ )
171
+ .get ();
172
+ assertSearchResponse (r1 );
173
+
174
+ // The cached is actually used
175
+ assertThat (
176
+ client .admin ().indices ().prepareStats ("index" ).setRequestCache (true ).get ().getTotal ().getRequestCache ().getMemorySizeInBytes (),
177
+ greaterThan (0L )
178
+ );
179
+ }
180
+
181
+ public void testInvalidationWithIndicesRequestCache () throws Exception {
182
+ internalCluster ().startNode (
183
+ Settings .builder ()
184
+ .put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null ))
185
+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
186
+ .build ()
187
+ );
188
+ Client client = client ();
189
+ assertAcked (
190
+ client .admin ()
191
+ .indices ()
192
+ .prepareCreate ("index" )
193
+ .setMapping ("k" , "type=keyword" )
194
+ .setSettings (
195
+ Settings .builder ()
196
+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
197
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
198
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
199
+ .put ("index.refresh_interval" , -1 )
200
+ )
201
+ .get ()
202
+ );
203
+ int numberOfIndexedItems = randomIntBetween (5 , 10 );
204
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
205
+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
206
+ }
207
+ ensureSearchable ("index" );
208
+ refreshAndWaitForReplication ();
209
+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
210
+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
211
+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
212
+ long perQuerySizeInCacheInBytes = -1 ;
213
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
214
+ SearchResponse resp = client .prepareSearch ("index" )
215
+ .setRequestCache (true )
216
+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
217
+ .get ();
218
+ if (perQuerySizeInCacheInBytes == -1 ) {
219
+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
220
+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
221
+ }
222
+ assertSearchResponse (resp );
223
+ }
224
+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
225
+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
226
+ assertEquals (0 , requestCacheStats .getHitCount ());
227
+ assertEquals (0 , requestCacheStats .getEvictions ());
228
+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
229
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
230
+ SearchResponse resp = client .prepareSearch ("index" )
231
+ .setRequestCache (true )
232
+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
233
+ .get ();
234
+ assertSearchResponse (resp );
235
+ }
236
+ requestCacheStats = getRequestCacheStats (client , "index" );
237
+ assertEquals (numberOfIndexedItems , requestCacheStats .getHitCount ());
238
+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
239
+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
240
+ assertEquals (0 , requestCacheStats .getEvictions ());
241
+ // Explicit refresh would invalidate cache entries.
242
+ refreshAndWaitForReplication ();
243
+ assertBusy (() -> {
244
+ // Explicit refresh should clear up cache entries
245
+ assertTrue (getRequestCacheStats (client , "index" ).getMemorySizeInBytes () == 0 );
246
+ }, 1 , TimeUnit .SECONDS );
247
+ requestCacheStats = getRequestCacheStats (client , "index" );
248
+ assertEquals (0 , requestCacheStats .getMemorySizeInBytes ());
249
+ // Hits and misses stats shouldn't get cleared up.
250
+ assertEquals (numberOfIndexedItems , requestCacheStats .getHitCount ());
251
+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
252
+ }
253
+
254
+ public void testExplicitCacheClearWithIndicesRequestCache () throws Exception {
255
+ internalCluster ().startNode (
256
+ Settings .builder ()
257
+ .put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , null ))
258
+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
259
+ .build ()
260
+ );
261
+ Client client = client ();
262
+ assertAcked (
263
+ client .admin ()
264
+ .indices ()
265
+ .prepareCreate ("index" )
266
+ .setMapping ("k" , "type=keyword" )
267
+ .setSettings (
268
+ Settings .builder ()
269
+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
270
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
271
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
272
+ .put ("index.refresh_interval" , -1 )
273
+ )
274
+ .get ()
275
+ );
276
+ int numberOfIndexedItems = randomIntBetween (5 , 10 );
277
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
278
+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
279
+ }
280
+ ensureSearchable ("index" );
281
+ refreshAndWaitForReplication ();
282
+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
283
+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
284
+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
285
+
286
+ long perQuerySizeInCacheInBytes = -1 ;
287
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
288
+ SearchResponse resp = client .prepareSearch ("index" )
289
+ .setRequestCache (true )
290
+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
291
+ .get ();
292
+ if (perQuerySizeInCacheInBytes == -1 ) {
293
+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
294
+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
295
+ }
296
+ assertSearchResponse (resp );
297
+ }
298
+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
299
+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
300
+ assertEquals (0 , requestCacheStats .getHitCount ());
301
+ assertEquals (0 , requestCacheStats .getEvictions ());
302
+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
303
+
304
+ // Explicit clear the cache.
305
+ ClearIndicesCacheRequest request = new ClearIndicesCacheRequest ("index" );
306
+ ClearIndicesCacheResponse response = client .admin ().indices ().clearCache (request ).get ();
307
+ assertNoFailures (response );
308
+
309
+ assertBusy (() -> {
310
+ // All entries should get cleared up.
311
+ assertTrue (getRequestCacheStats (client , "index" ).getMemorySizeInBytes () == 0 );
312
+ }, 1 , TimeUnit .SECONDS );
313
+ }
314
+
315
+ public void testEvictionsFlowWithExpirationTime () throws Exception {
316
+ internalCluster ().startNode (
317
+ Settings .builder ()
318
+ .put (defaultSettings (DEFAULT_CACHE_SIZE_IN_BYTES , new TimeValue (0 ))) // Immediately evict items after
319
+ // access
320
+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
321
+ .build ()
322
+ );
323
+ Client client = client ();
324
+ assertAcked (
325
+ client .admin ()
326
+ .indices ()
327
+ .prepareCreate ("index" )
328
+ .setMapping ("k" , "type=keyword" )
329
+ .setSettings (
330
+ Settings .builder ()
331
+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
332
+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
333
+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
334
+ .put ("index.refresh_interval" , -1 )
335
+ )
336
+ .get ()
337
+ );
338
+ int numberOfIndexedItems = 2 ;// randomIntBetween(5, 10);
339
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
340
+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
341
+ }
342
+ ensureSearchable ("index" );
343
+ refreshAndWaitForReplication ();
344
+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
345
+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
346
+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
347
+
348
+ long perQuerySizeInCacheInBytes = -1 ;
349
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
350
+ SearchResponse resp = client .prepareSearch ("index" )
351
+ .setRequestCache (true )
352
+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
353
+ .get ();
354
+ if (perQuerySizeInCacheInBytes == -1 ) {
355
+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
356
+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
357
+ }
358
+ assertSearchResponse (resp );
359
+ }
360
+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
361
+ assertEquals (0 , requestCacheStats .getHitCount ());
362
+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
363
+ assertEquals (perQuerySizeInCacheInBytes * numberOfIndexedItems , requestCacheStats .getMemorySizeInBytes ());
364
+ assertEquals (0 , requestCacheStats .getEvictions ());
365
+
366
+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
367
+ SearchResponse resp = client .prepareSearch ("index" )
368
+ .setRequestCache (true )
369
+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
370
+ .get ();
371
+ assertSearchResponse (resp );
372
+ }
373
+ requestCacheStats = getRequestCacheStats (client , "index" );
374
+ // Now that we have access the entries, they should expire after 1ms. So lets wait and verify that cache gets
375
+ // cleared up.
376
+ assertBusy (() -> {
377
+ // Explicit refresh should clear up cache entries
378
+ assertTrue (getRequestCacheStats (client , "index" ).getMemorySizeInBytes () == 0 );
379
+ }, 10 , TimeUnit .MILLISECONDS );
380
+ // Validate hit and miss count.
381
+ assertEquals (numberOfIndexedItems , requestCacheStats .getHitCount ());
382
+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
383
+ }
384
+
385
+ private RequestCacheStats getRequestCacheStats (Client client , String indexName ) {
386
+ return client .admin ().indices ().prepareStats (indexName ).setRequestCache (true ).get ().getTotal ().getRequestCache ();
387
+ }
48
388
}
0 commit comments