Skip to content

Commit dc61f6e

Browse files
committed
Add thread to perform pending cache maintenance every minute
Signed-off-by: owenhalpert <ohalpert@gmail.com>
1 parent 9276c77 commit dc61f6e

File tree

10 files changed

+112
-3
lines changed

10 files changed

+112
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
### Documentation
2727
### Maintenance
2828
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
29+
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2239](https://github.com/opensearch-project/k-NN/issues/2239)
2930
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
3031
### Refactoring

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
2424
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
2525
import org.opensearch.knn.index.KNNSettings;
26+
import org.opensearch.knn.index.util.ScheduledExecutor;
2627
import org.opensearch.knn.plugin.stats.StatNames;
2728

2829
import java.io.Closeable;
@@ -51,6 +52,7 @@ public class NativeMemoryCacheManager implements Closeable {
5152
private Cache<String, NativeMemoryAllocation> cache;
5253
private Deque<String> accessRecencyQueue;
5354
private final ExecutorService executor;
55+
private ScheduledExecutor cacheMaintainer;
5456
private AtomicBoolean cacheCapacityReached;
5557
private long maxWeight;
5658

@@ -87,6 +89,10 @@ private void initialize() {
8789
}
8890

8991
private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
92+
if (cacheMaintainer != null) {
93+
cacheMaintainer.close();
94+
}
95+
9096
CacheBuilder<String, NativeMemoryAllocation> cacheBuilder = CacheBuilder.newBuilder()
9197
.recordStats()
9298
.concurrencyLevel(1)
@@ -99,6 +105,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
99105

100106
if (nativeMemoryCacheDTO.isExpirationLimited()) {
101107
cacheBuilder.expireAfterAccess(nativeMemoryCacheDTO.getExpiryTimeInMin(), TimeUnit.MINUTES);
108+
this.cacheMaintainer = new ScheduledExecutor(() -> cache.cleanUp(), 60 * 1000);
102109
}
103110

104111
cacheCapacityReached = new AtomicBoolean(false);
@@ -142,6 +149,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
142149
@Override
143150
public void close() {
144151
executor.shutdown();
152+
if (cacheMaintainer != null) {
153+
cacheMaintainer.close();
154+
}
145155
}
146156

147157
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.util;
7+
8+
import java.io.Closeable;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.ScheduledExecutorService;
11+
import java.util.concurrent.TimeUnit;
12+
13+
/**
14+
* Executes a task periodically
15+
*/
16+
public class ScheduledExecutor implements Closeable {
17+
final ScheduledExecutorService executor;
18+
public final Runnable task;
19+
20+
/**
21+
* @param task task to be completed
22+
* @param scheduleMillis time in milliseconds to wait before executing the task again
23+
*/
24+
public ScheduledExecutor(Runnable task, long scheduleMillis) {
25+
this.task = task;
26+
this.executor = Executors.newSingleThreadScheduledExecutor();
27+
executor.scheduleAtFixedRate(task, 0, scheduleMillis, TimeUnit.MILLISECONDS);
28+
}
29+
30+
@Override
31+
public void close() {
32+
executor.shutdown();
33+
}
34+
}

src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java

+17-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.opensearch.common.unit.TimeValue;
1616
import org.opensearch.core.common.unit.ByteSizeValue;
1717
import org.opensearch.knn.index.KNNSettings;
18+
import org.opensearch.knn.index.util.ScheduledExecutor;
1819

20+
import java.io.Closeable;
1921
import java.io.IOException;
2022
import java.time.Instant;
2123
import java.util.concurrent.TimeUnit;
@@ -27,10 +29,11 @@
2729
* A thread-safe singleton cache that contains quantization states.
2830
*/
2931
@Log4j2
30-
public class QuantizationStateCache {
32+
public class QuantizationStateCache implements Closeable {
3133

3234
private static volatile QuantizationStateCache instance;
3335
private Cache<String, QuantizationState> cache;
36+
private ScheduledExecutor cacheMaintainer;
3437
@Getter
3538
private long maxCacheSizeInKB;
3639
@Getter
@@ -58,6 +61,10 @@ static QuantizationStateCache getInstance() {
5861
}
5962

6063
private void buildCache() {
64+
if (cacheMaintainer != null) {
65+
cacheMaintainer.close();
66+
}
67+
6168
this.cache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumWeight(maxCacheSizeInKB).weigher((k, v) -> {
6269
try {
6370
return ((QuantizationState) v).toByteArray().length;
@@ -71,6 +78,8 @@ private void buildCache() {
7178
)
7279
.removalListener(this::onRemoval)
7380
.build();
81+
82+
this.cacheMaintainer = new ScheduledExecutor(() -> cache.cleanUp(), 60 * 1000);
7483
}
7584

7685
synchronized void rebuildCache() {
@@ -129,4 +138,11 @@ private void updateEvictedDueToSizeAt() {
129138
public void clear() {
130139
cache.invalidateAll();
131140
}
141+
142+
@Override
143+
public void close() throws IOException {
144+
if (cacheMaintainer != null) {
145+
cacheMaintainer.close();
146+
}
147+
}
132148
}

src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java

+4
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
7979
public void clear() {
8080
QuantizationStateCache.getInstance().clear();
8181
}
82+
83+
public void close() throws IOException {
84+
QuantizationStateCache.getInstance().close();
85+
}
8286
}

src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.core.xcontent.XContentBuilder;
3636
import org.opensearch.common.xcontent.XContentFactory;
3737
import org.opensearch.index.IndexService;
38+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
3839
import org.opensearch.plugins.Plugin;
3940
import org.opensearch.core.rest.RestStatus;
4041
import org.opensearch.test.OpenSearchSingleNodeTestCase;
@@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
8687
public void tearDown() throws Exception {
8788
NativeMemoryCacheManager.getInstance().invalidateAll();
8889
NativeMemoryCacheManager.getInstance().close();
90+
QuantizationStateCacheManager.getInstance().close();
8991
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
9092
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
9193
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();

src/test/java/org/opensearch/knn/KNNTestCase.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.opensearch.core.common.bytes.BytesReference;
2525
import org.opensearch.core.xcontent.XContentBuilder;
2626
import org.opensearch.common.xcontent.XContentHelper;
27+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
2728
import org.opensearch.test.OpenSearchTestCase;
2829

30+
import java.io.IOException;
2931
import java.util.Collections;
3032
import java.util.HashSet;
3133
import java.util.Map;
@@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
7375
return false;
7476
}
7577

76-
public void resetState() {
78+
public void resetState() throws IOException {
7779
// Reset all of the counters
7880
for (KNNCounter knnCounter : KNNCounter.values()) {
7981
knnCounter.set(0L);
@@ -83,6 +85,7 @@ public void resetState() {
8385
// Clean up the cache
8486
NativeMemoryCacheManager.getInstance().invalidateAll();
8587
NativeMemoryCacheManager.getInstance().close();
88+
QuantizationStateCacheManager.getInstance().close();
8689
}
8790

8891
private void initKNNSettings() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index;
7+
8+
import com.google.common.cache.Cache;
9+
import com.google.common.cache.CacheBuilder;
10+
import org.junit.Test;
11+
import org.opensearch.knn.index.util.ScheduledExecutor;
12+
13+
import java.util.concurrent.TimeUnit;
14+
15+
import static org.junit.Assert.assertEquals;
16+
17+
public class CacheMaintainerTests {
18+
@Test
19+
public void testCacheEviction() throws InterruptedException {
20+
Cache<String, String> testCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build();
21+
22+
ScheduledExecutor executor = new ScheduledExecutor(testCache::cleanUp, 60 * 1000);
23+
24+
testCache.put("key1", "value1");
25+
assertEquals(testCache.size(), 1);
26+
27+
Thread.sleep(1500);
28+
29+
executor.task.run();
30+
31+
assertEquals(testCache.size(), 0);
32+
33+
executor.close();
34+
}
35+
}

src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public void tearDown() throws Exception {
4141
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
4242
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
4343
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
44+
NativeMemoryCacheManager.getInstance().close();
4445
super.tearDown();
4546
}
4647

@@ -378,6 +379,7 @@ public void testCacheCapacity() {
378379

379380
nativeMemoryCacheManager.setCacheCapacityReached(false);
380381
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
382+
nativeMemoryCacheManager.close();
381383
}
382384

383385
public void testGetIndicesCacheStats() throws IOException, ExecutionException {

src/test/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.knn.index.KNNSettings;
1717
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;
1818

19+
import java.io.IOException;
1920
import java.util.concurrent.CountDownLatch;
2021
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.Executors;
@@ -417,7 +418,7 @@ public void testRebuildOnTimeExpirySettingsChange() {
417418
assertNull("State should be null", retrievedState);
418419
}
419420

420-
public void testCacheEvictionDueToSize() {
421+
public void testCacheEvictionDueToSize() throws IOException {
421422
String fieldName = "evictionField";
422423
// States have size of slightly over 500 bytes so that adding two will reach the max size of 1 kb for the cache
423424
int arrayLength = 112;
@@ -445,6 +446,7 @@ public void testCacheEvictionDueToSize() {
445446
cache.addQuantizationState(fieldName, state);
446447
cache.addQuantizationState(fieldName, state2);
447448
cache.clear();
449+
cache.close();
448450
assertNotNull(cache.getEvictedDueToSizeAt());
449451
}
450452
}

0 commit comments

Comments
 (0)