21
21
import org .apache .logging .log4j .Logger ;
22
22
import org .opensearch .common .unit .TimeValue ;
23
23
import org .opensearch .knn .common .exception .OutOfNativeMemoryException ;
24
+ import org .opensearch .knn .common .featureflags .KNNFeatureFlags ;
24
25
import org .opensearch .knn .index .KNNSettings ;
25
26
import org .opensearch .knn .plugin .stats .StatNames ;
26
27
27
28
import java .io .Closeable ;
29
+ import java .util .Deque ;
28
30
import java .util .HashMap ;
31
+ import java .util .Iterator ;
29
32
import java .util .Map ;
30
33
import java .util .Optional ;
34
+ import java .util .concurrent .ConcurrentLinkedDeque ;
31
35
import java .util .concurrent .ExecutionException ;
32
36
import java .util .concurrent .ExecutorService ;
33
37
import java .util .concurrent .Executors ;
@@ -45,6 +49,7 @@ public class NativeMemoryCacheManager implements Closeable {
45
49
private static NativeMemoryCacheManager INSTANCE ;
46
50
47
51
private Cache <String , NativeMemoryAllocation > cache ;
52
+ private Deque <String > accessRecencyQueue ;
48
53
private final ExecutorService executor ;
49
54
private AtomicBoolean cacheCapacityReached ;
50
55
private long maxWeight ;
@@ -97,7 +102,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
97
102
}
98
103
99
104
cacheCapacityReached = new AtomicBoolean (false );
100
-
105
+ accessRecencyQueue = new ConcurrentLinkedDeque <>();
101
106
cache = cacheBuilder .build ();
102
107
}
103
108
@@ -301,7 +306,52 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
301
306
);
302
307
}
303
308
304
- return cache .get (nativeMemoryEntryContext .getKey (), nativeMemoryEntryContext ::load );
309
+ if (KNNFeatureFlags .isForceEvictCacheEnabled ()) {
310
+ // Utilizes a force eviction mechanism to free up memory before the entry can be added to the cache
311
+ // In case of a cache hit, the operation just updates the locally maintained recency list
312
+ // In case of a cache miss, least recently accessed entries are evicted in a blocking manner
313
+ // before the new entry can be added to the cache.
314
+ String key = nativeMemoryEntryContext .getKey ();
315
+ NativeMemoryAllocation result = cache .getIfPresent (key );
316
+
317
+ // Cache Hit
318
+ // In case of a cache hit, moving the item to the end of the recency queue adds
319
+ // some overhead to the get operation. This can be optimized further to make this operation
320
+ // as lightweight as possible. Multiple approaches and their outcomes were documented
321
+ // before moving forward with the current solution.
322
+ // The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680
323
+ if (result != null ) {
324
+ accessRecencyQueue .remove (key );
325
+ accessRecencyQueue .addLast (key );
326
+ return result ;
327
+ }
328
+
329
+ // Cache Miss
330
+ // Evict before put
331
+ synchronized (this ) {
332
+ if (getCacheSizeInKilobytes () + nativeMemoryEntryContext .calculateSizeInKB () >= maxWeight ) {
333
+ Iterator <String > lruIterator = accessRecencyQueue .iterator ();
334
+ while (lruIterator .hasNext ()
335
+ && (getCacheSizeInKilobytes () + nativeMemoryEntryContext .calculateSizeInKB () >= maxWeight )) {
336
+
337
+ String keyToRemove = lruIterator .next ();
338
+ NativeMemoryAllocation allocationToRemove = cache .getIfPresent (keyToRemove );
339
+ if (allocationToRemove != null ) {
340
+ allocationToRemove .close ();
341
+ cache .invalidate (keyToRemove );
342
+ }
343
+ lruIterator .remove ();
344
+ }
345
+ }
346
+
347
+ result = cache .get (key , nativeMemoryEntryContext ::load );
348
+ accessRecencyQueue .addLast (key );
349
+
350
+ return result ;
351
+ }
352
+ } else {
353
+ return cache .get (nativeMemoryEntryContext .getKey (), nativeMemoryEntryContext ::load );
354
+ }
305
355
}
306
356
307
357
/**
0 commit comments