Skip to content

Commit 06852a4

Browse files
committed
Add CacheMaintainer class to perform pending cache maintenance every minute
Signed-off-by: owenhalpert <ohalpert@gmail.com> # Conflicts: # CHANGELOG.md
1 parent 9276c77 commit 06852a4

File tree

10 files changed

+119
-3
lines changed

10 files changed

+119
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
### Documentation
2727
### Maintenance
2828
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
29+
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239)
2930
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
3031
### Refactoring
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index;
7+
8+
import com.google.common.cache.Cache;
9+
10+
import java.io.Closeable;
11+
import java.util.concurrent.Executors;
12+
import java.util.concurrent.ScheduledExecutorService;
13+
import java.util.concurrent.TimeUnit;
14+
15+
/**
16+
* Performs periodic maintenance for a Guava cache. The Guava cache is implemented in a way that maintenance operations (such as evicting expired
17+
* entries) will only occur when the cache is accessed. See {@see <a href="https://github.com/google/guava/wiki/cachesexplained#timed-eviction"> Guava Cache Guide</a>}
18+
* for more details. Thus, to perform any pending maintenance, the cleanUp method will be called periodically from a CacheMaintainer instance.
19+
*/
20+
public class CacheMaintainer<K, V> implements Closeable {
21+
private final Cache<K, V> cache;
22+
private final ScheduledExecutorService executor;
23+
private static final int DEFAULT_INTERVAL_SECONDS = 60;
24+
25+
public CacheMaintainer(Cache<K, V> cache) {
26+
this.cache = cache;
27+
this.executor = Executors.newSingleThreadScheduledExecutor();
28+
}
29+
30+
public void startMaintenance() {
31+
executor.scheduleAtFixedRate(this::cleanCache, DEFAULT_INTERVAL_SECONDS, DEFAULT_INTERVAL_SECONDS, TimeUnit.SECONDS);
32+
}
33+
34+
public void cleanCache() {
35+
cache.cleanUp();
36+
}
37+
38+
@Override
39+
public void close() {
40+
executor.shutdown();
41+
}
42+
}

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.opensearch.common.unit.TimeValue;
2323
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
2424
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
25+
import org.opensearch.knn.index.CacheMaintainer;
2526
import org.opensearch.knn.index.KNNSettings;
2627
import org.opensearch.knn.plugin.stats.StatNames;
2728

@@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
5152
private Cache<String, NativeMemoryAllocation> cache;
5253
private Deque<String> accessRecencyQueue;
5354
private final ExecutorService executor;
55+
private CacheMaintainer<String, NativeMemoryAllocation> cacheMaintainer;
5456
private AtomicBoolean cacheCapacityReached;
5557
private long maxWeight;
5658

@@ -87,6 +89,10 @@ private void initialize() {
8789
}
8890

8991
private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
92+
if (cacheMaintainer != null) {
93+
cacheMaintainer.close();
94+
}
95+
9096
CacheBuilder<String, NativeMemoryAllocation> cacheBuilder = CacheBuilder.newBuilder()
9197
.recordStats()
9298
.concurrencyLevel(1)
@@ -104,6 +110,9 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
104110
cacheCapacityReached = new AtomicBoolean(false);
105111
accessRecencyQueue = new ConcurrentLinkedDeque<>();
106112
cache = cacheBuilder.build();
113+
114+
this.cacheMaintainer = new CacheMaintainer<>(cache);
115+
this.cacheMaintainer.startMaintenance();
107116
}
108117

109118
/**
@@ -142,6 +151,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
142151
@Override
143152
public void close() {
144153
executor.shutdown();
154+
if (cacheMaintainer != null) {
155+
cacheMaintainer.close();
156+
}
145157
}
146158

147159
/**

src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
import lombok.extern.log4j.Log4j2;
1515
import org.opensearch.common.unit.TimeValue;
1616
import org.opensearch.core.common.unit.ByteSizeValue;
17+
import org.opensearch.knn.index.CacheMaintainer;
1718
import org.opensearch.knn.index.KNNSettings;
1819

20+
import java.io.Closeable;
1921
import java.io.IOException;
2022
import java.time.Instant;
2123
import java.util.concurrent.TimeUnit;
@@ -27,10 +29,11 @@
2729
* A thread-safe singleton cache that contains quantization states.
2830
*/
2931
@Log4j2
30-
public class QuantizationStateCache {
32+
public class QuantizationStateCache implements Closeable {
3133

3234
private static volatile QuantizationStateCache instance;
3335
private Cache<String, QuantizationState> cache;
36+
private CacheMaintainer<String, QuantizationState> cacheMaintainer;
3437
@Getter
3538
private long maxCacheSizeInKB;
3639
@Getter
@@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() {
5861
}
5962

6063
private void buildCache() {
64+
if (cacheMaintainer != null) {
65+
cacheMaintainer.close();
66+
}
67+
6168
this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> {
6269
try {
6370
return ((QuantizationState) v).toByteArray().length;
@@ -71,6 +78,9 @@ private void buildCache() {
7178
)
7279
.removalListener(this::onRemoval)
7380
.build();
81+
82+
this.cacheMaintainer = new CacheMaintainer<>(cache);
83+
this.cacheMaintainer.startMaintenance();
7484
}
7585

7686
synchronized void rebuildCache() {
@@ -129,4 +139,9 @@ private void updateEvictedDueToSizeAt() {
129139
public void clear() {
130140
cache.invalidateAll();
131141
}
142+
143+
@Override
144+
public void close() throws IOException {
145+
cacheMaintainer.close();
146+
}
132147
}

src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
7979
public void clear() {
8080
QuantizationStateCache.getInstance().clear();
8181
}
82+
83+
public void close() throws IOException {
84+
QuantizationStateCache.getInstance().close();
85+
}
8286
}

src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.core.xcontent.XContentBuilder;
3636
import org.opensearch.common.xcontent.XContentFactory;
3737
import org.opensearch.index.IndexService;
38+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
3839
import org.opensearch.plugins.Plugin;
3940
import org.opensearch.core.rest.RestStatus;
4041
import org.opensearch.test.OpenSearchSingleNodeTestCase;
@@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
8687
public void tearDown() throws Exception {
8788
NativeMemoryCacheManager.getInstance().invalidateAll();
8889
NativeMemoryCacheManager.getInstance().close();
90+
QuantizationStateCacheManager.getInstance().close();
8991
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
9092
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
9193
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();

src/test/java/org/opensearch/knn/KNNTestCase.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.opensearch.core.common.bytes.BytesReference;
2525
import org.opensearch.core.xcontent.XContentBuilder;
2626
import org.opensearch.common.xcontent.XContentHelper;
27+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
2728
import org.opensearch.test.OpenSearchTestCase;
2829

30+
import java.io.IOException;
2931
import java.util.Collections;
3032
import java.util.HashSet;
3133
import java.util.Map;
@@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
7375
return false;
7476
}
7577

76-
public void resetState() {
78+
public void resetState() throws IOException {
7779
// Reset all of the counters
7880
for (KNNCounter knnCounter : KNNCounter.values()) {
7981
knnCounter.set(0L);
@@ -83,6 +85,7 @@ public void resetState() {
8385
// Clean up the cache
8486
NativeMemoryCacheManager.getInstance().invalidateAll();
8587
NativeMemoryCacheManager.getInstance().close();
88+
QuantizationStateCacheManager.getInstance().close();
8689
}
8790

8891
private void initKNNSettings() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index;
7+
8+
import com.google.common.cache.Cache;
9+
import com.google.common.cache.CacheBuilder;
10+
import org.junit.Test;
11+
12+
import java.util.concurrent.TimeUnit;
13+
14+
import static org.junit.Assert.assertEquals;
15+
16+
public class CacheMaintainerTests {
17+
@Test
18+
public void testCacheEviction() throws InterruptedException {
19+
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();
20+
21+
CacheMaintainer<String, String> cleaner = new CacheMaintainer<>(testCache);
22+
23+
testCache.put("key1", "value1");
24+
assertEquals(testCache.size(), 1);
25+
26+
Thread.sleep(1500);
27+
28+
cleaner.cleanCache();
29+
assertEquals(testCache.size(), 0);
30+
31+
cleaner.close();
32+
}
33+
}

src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public void tearDown() throws Exception {
4141
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
4242
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
4343
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
44+
NativeMemoryCacheManager.getInstance().close();
4445
super.tearDown();
4546
}
4647

@@ -378,6 +379,7 @@ public void testCacheCapacity() {
378379

379380
nativeMemoryCacheManager.setCacheCapacityReached(false);
380381
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
382+
nativeMemoryCacheManager.close();
381383
}
382384

383385
public void testGetIndicesCacheStats() throws IOException, ExecutionException {

src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.knn.index.KNNSettings;
1717
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;
1818

19+
import java.io.IOException;
1920
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Executors;
@@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() {
417418
assertNull("State should be null", retrievedState);
418419
}
419420

420-
public void testCacheEvictionDueToSize() {
421+
public void testCacheEvictionDueToSize() throws IOException {
421422
String fieldName = "evictionField";
422423
// States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache
423424
int arrayLength = 112;
@@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() {
445446
cache.addQuantizationState(fieldName, state);
446447
cache.addQuantizationState(fieldName, state2);
447448
cache.clear();
449+
cache.close();
448450
assertNotNull(cache.getEvictedDueToSizeAt());
449451
}
450452
}

0 commit comments

Comments
 (0)