47
47
import org .opensearch .common .cache .RemovalNotification ;
48
48
import org .opensearch .common .cache .service .CacheService ;
49
49
import org .opensearch .common .cache .store .config .CacheConfig ;
50
+ import org .opensearch .common .lease .Releasable ;
51
+ import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
50
52
import org .opensearch .common .lucene .index .OpenSearchDirectoryReader ;
51
53
import org .opensearch .common .settings .Setting ;
52
54
import org .opensearch .common .settings .Setting .Property ;
61
63
import org .opensearch .core .common .unit .ByteSizeValue ;
62
64
import org .opensearch .core .index .shard .ShardId ;
63
65
import org .opensearch .index .shard .IndexShard ;
66
+ import org .opensearch .threadpool .ThreadPool ;
64
67
65
68
import java .io .Closeable ;
66
69
import java .io .IOException ;
95
98
*
96
99
* @opensearch.internal
97
100
*/
98
- public final class IndicesRequestCache implements RemovalListener <IndicesRequestCache .Key , BytesReference >, Closeable {
101
+ public final class IndicesRequestCache extends AbstractLifecycleComponent
102
+ implements
103
+ RemovalListener <IndicesRequestCache .Key , BytesReference >,
104
+ Closeable {
99
105
100
106
private static final Logger logger = LogManager .getLogger (IndicesRequestCache .class );
101
107
@@ -138,13 +144,26 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
138
144
private final TimeValue expire ;
139
145
private final ICache <Key , BytesReference > cache ;
140
146
private final Function <ShardId , Optional <CacheEntity >> cacheEntityLookup ;
141
-
142
- IndicesRequestCache (Settings settings , Function <ShardId , Optional <CacheEntity >> cacheEntityFunction , CacheService cacheService ) {
147
+ // pkg-private for testing
148
+ final IndicesRequestCacheCleanupManager cacheCleanupManager ;
149
+ private final IndicesRequestCacheCleaner cacheCleaner ;
150
+ private final TimeValue cleanInterval ;
151
+ private final ThreadPool threadpool ;
152
+
153
+ IndicesRequestCache (
154
+ Settings settings ,
155
+ Function <ShardId , Optional <CacheEntity >> cacheEntityFunction ,
156
+ CacheService cacheService ,
157
+ ThreadPool threadPool
158
+ ) {
143
159
this .size = INDICES_CACHE_QUERY_SIZE .get (settings );
144
160
this .expire = INDICES_CACHE_QUERY_EXPIRE .exists (settings ) ? INDICES_CACHE_QUERY_EXPIRE .get (settings ) : null ;
161
+ this .cleanInterval = INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING .get (settings );
145
162
long sizeInBytes = size .getBytes ();
146
163
ToLongBiFunction <Key , BytesReference > weigher = (k , v ) -> k .ramBytesUsed () + v .ramBytesUsed ();
147
164
this .cacheCleanupManager = new IndicesRequestCacheCleanupManager (getStalenessThreshold (settings ));
165
+ this .threadpool = threadPool ;
166
+ this .cacheCleaner = new IndicesRequestCacheCleaner (this , this .threadpool , this .cleanInterval );
148
167
this .cacheEntityLookup = cacheEntityFunction ;
149
168
this .cache = cacheService .createCache (
150
169
new CacheConfig .Builder <Key , BytesReference >().setSettings (settings )
@@ -159,6 +178,19 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
159
178
);
160
179
}
161
180
181
+ @ Override
182
+ protected void doStart () {
183
+ threadpool .schedule (this .cacheCleaner , this .cleanInterval , ThreadPool .Names .SAME );
184
+ }
185
+
186
+ @ Override
187
+ protected void doStop () {
188
+ cacheCleaner .close ();
189
+ }
190
+
191
+ @ Override
192
+ protected void doClose () throws IOException {}
193
+
162
194
@ Override
163
195
public void close () {
164
196
cache .invalidateAll ();
@@ -230,6 +262,39 @@ void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReade
230
262
cache .invalidate (new Key (((IndexShard ) cacheEntity .getCacheIdentity ()).shardId (), cacheKey , readerCacheKeyId ));
231
263
}
232
264
265
+ private final class IndicesRequestCacheCleaner implements Runnable , Releasable {
266
+
267
+ private final IndicesRequestCache indicesRequestCache ;
268
+ private final ThreadPool threadPool ;
269
+ private final TimeValue interval ;
270
+
271
+ IndicesRequestCacheCleaner (IndicesRequestCache indicesRequestCache , ThreadPool threadPool , TimeValue interval ) {
272
+ this .indicesRequestCache = indicesRequestCache ;
273
+ this .threadPool = threadPool ;
274
+ this .interval = interval ;
275
+ }
276
+
277
+ private final AtomicBoolean closed = new AtomicBoolean (false );
278
+
279
+ @ Override
280
+ public void run () {
281
+ try {
282
+ this .indicesRequestCache .cacheCleanupManager .cleanCache ();
283
+ } catch (Exception e ) {
284
+ logger .warn ("Exception during periodic indices request cache cleanup:" , e );
285
+ }
286
+ // Reschedule itself to run again if not closed
287
+ if (closed .get () == false ) {
288
+ threadPool .scheduleUnlessShuttingDown (interval , ThreadPool .Names .SAME , this );
289
+ }
290
+ }
291
+
292
+ @ Override
293
+ public void close () {
294
+ closed .compareAndSet (false , true );
295
+ }
296
+ }
297
+
233
298
/**
234
299
* Loader for the request cache
235
300
*
0 commit comments