Skip to content

Commit 2824a5a

Browse files
owenhalpertgithub-actions[bot]
authored andcommitted
Add thread to periodically perform pending cache maintenance (#2308)
Signed-off-by: owenhalpert <ohalpert@gmail.com> (cherry picked from commit b2a47d8)
1 parent cc72442 commit 2824a5a

File tree

9 files changed

+189
-4
lines changed

9 files changed

+189
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3636
### Documentation
3737
### Maintenance
3838
* Select index settings based on cluster version[2236](https://github.com/opensearch-project/k-NN/pull/2236)
39+
* Added periodic cache maintenance for QuantizationStateCache and NativeMemoryCache [#2308](https://github.com/opensearch-project/k-NN/pull/2308)
3940
* Added null checks for fieldInfo in ExactSearcher to avoid NPE while running exact search for segments with no vector field (#2278)[https://github.com/opensearch-project/k-NN/pull/2278]
4041
* Upgrade jsonpath from 2.8.0 to 2.9.0[2325](https://github.com/opensearch-project/k-NN/pull/2325)
4142
### Refactoring

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+42
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.google.common.cache.CacheStats;
1717
import com.google.common.cache.RemovalCause;
1818
import com.google.common.cache.RemovalNotification;
19+
import lombok.Getter;
20+
import lombok.Setter;
1921
import org.apache.commons.lang.Validate;
2022
import org.apache.logging.log4j.LogManager;
2123
import org.apache.logging.log4j.Logger;
@@ -24,6 +26,8 @@
2426
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
2527
import org.opensearch.knn.index.KNNSettings;
2628
import org.opensearch.knn.plugin.stats.StatNames;
29+
import org.opensearch.threadpool.ThreadPool;
30+
import org.opensearch.threadpool.Scheduler.Cancellable;
2731

2832
import java.io.Closeable;
2933
import java.util.Deque;
@@ -47,12 +51,16 @@ public class NativeMemoryCacheManager implements Closeable {
4751

4852
private static final Logger logger = LogManager.getLogger(NativeMemoryCacheManager.class);
4953
private static NativeMemoryCacheManager INSTANCE;
54+
@Setter
55+
private static ThreadPool threadPool;
5056

5157
private Cache<String, NativeMemoryAllocation> cache;
5258
private Deque<String> accessRecencyQueue;
5359
private final ExecutorService executor;
5460
private AtomicBoolean cacheCapacityReached;
5561
private long maxWeight;
62+
@Getter
63+
private Cancellable maintenanceTask;
5664

5765
NativeMemoryCacheManager() {
5866
this.executor = Executors.newSingleThreadExecutor();
@@ -104,6 +112,12 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
104112
cacheCapacityReached = new AtomicBoolean(false);
105113
accessRecencyQueue = new ConcurrentLinkedDeque<>();
106114
cache = cacheBuilder.build();
115+
116+
if (threadPool != null) {
117+
startMaintenance(cache);
118+
} else {
119+
logger.warn("ThreadPool is null during NativeMemoryCacheManager initialization. Maintenance will not start.");
120+
}
107121
}
108122

109123
/**
@@ -142,6 +156,9 @@ public synchronized void rebuildCache(NativeMemoryCacheManagerDto nativeMemoryCa
142156
@Override
143157
public void close() {
144158
executor.shutdown();
159+
if (maintenanceTask != null) {
160+
maintenanceTask.cancel();
161+
}
145162
}
146163

147164
/**
@@ -449,4 +466,29 @@ private Float getSizeAsPercentage(long size) {
449466
}
450467
return 100 * size / (float) cbLimit;
451468
}
469+
470+
/**
471+
* Starts the scheduled maintenance for the cache. Without this thread calling cleanUp(), the Guava cache only
472+
* performs maintenance operations (such as evicting expired entries) when the cache is accessed. This
473+
* ensures that the cache is also cleaned up based on the configured expiry time.
474+
* @see <a href="https://github.com/google/guava/wiki/cachesexplained#timed-eviction"> Guava Cache Guide</a>
475+
* @param cacheInstance cache on which to call cleanUp()
476+
*/
477+
private void startMaintenance(Cache<String, NativeMemoryAllocation> cacheInstance) {
478+
if (maintenanceTask != null) {
479+
maintenanceTask.cancel();
480+
}
481+
482+
Runnable cleanUp = () -> {
483+
try {
484+
cacheInstance.cleanUp();
485+
} catch (Exception e) {
486+
logger.error("Error cleaning up cache", e);
487+
}
488+
};
489+
490+
TimeValue interval = KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES);
491+
492+
maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
493+
}
452494
}

src/main/java/org/opensearch/knn/plugin/KNNPlugin.java

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.index.engine.EngineFactory;
1414
import org.opensearch.indices.SystemIndexDescriptor;
1515
import org.opensearch.knn.index.KNNCircuitBreaker;
16+
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
1617
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
1718
import org.opensearch.knn.index.util.KNNClusterUtil;
1819
import org.opensearch.knn.index.query.KNNQueryBuilder;
@@ -80,6 +81,7 @@
8081
import org.opensearch.knn.plugin.transport.UpdateModelMetadataTransportAction;
8182
import org.opensearch.knn.plugin.transport.UpdateModelGraveyardAction;
8283
import org.opensearch.knn.plugin.transport.UpdateModelGraveyardTransportAction;
84+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCache;
8385
import org.opensearch.knn.training.TrainingJobClusterStateListener;
8486
import org.opensearch.knn.training.TrainingJobRunner;
8587
import org.opensearch.knn.training.VectorReader;
@@ -204,6 +206,8 @@ public Collection<Object> createComponents(
204206
ModelCache.initialize(ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService);
205207
TrainingJobRunner.initialize(threadPool, ModelDao.OpenSearchKNNModelDao.getInstance());
206208
TrainingJobClusterStateListener.initialize(threadPool, ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService);
209+
QuantizationStateCache.setThreadPool(threadPool);
210+
NativeMemoryCacheManager.setThreadPool(threadPool);
207211
KNNCircuitBreaker.getInstance().initialize(threadPool, clusterService, client);
208212
KNNQueryBuilder.initialize(ModelDao.OpenSearchKNNModelDao.getInstance());
209213
KNNWeight.initialize(ModelDao.OpenSearchKNNModelDao.getInstance());

src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCache.java

+48-1
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111
import com.google.common.cache.RemovalCause;
1212
import com.google.common.cache.RemovalNotification;
1313
import lombok.Getter;
14+
import lombok.Setter;
1415
import lombok.extern.log4j.Log4j2;
1516
import org.opensearch.common.unit.TimeValue;
1617
import org.opensearch.core.common.unit.ByteSizeValue;
1718
import org.opensearch.knn.index.KNNSettings;
19+
import org.opensearch.threadpool.Scheduler.Cancellable;
20+
import org.opensearch.threadpool.ThreadPool;
1821

22+
import java.io.Closeable;
1923
import java.io.IOException;
2024
import java.time.Instant;
2125
import java.util.concurrent.TimeUnit;
@@ -27,14 +31,18 @@
2731
* A thread-safe singleton cache that contains quantization states.
2832
*/
2933
@Log4j2
30-
public class QuantizationStateCache {
34+
public class QuantizationStateCache implements Closeable {
3135

3236
private static volatile QuantizationStateCache instance;
37+
@Setter
38+
private static ThreadPool threadPool;
3339
private Cache<String, QuantizationState> cache;
3440
@Getter
3541
private long maxCacheSizeInKB;
3642
@Getter
3743
private Instant evictedDueToSizeAt;
44+
@Getter
45+
private Cancellable maintenanceTask;
3846

3947
@VisibleForTesting
4048
QuantizationStateCache() {
@@ -71,6 +79,37 @@ private void buildCache() {
7179
)
7280
.removalListener(this::onRemoval)
7381
.build();
82+
83+
if (threadPool != null) {
84+
startMaintenance(cache);
85+
} else {
86+
log.warn("ThreadPool is null during QuantizationStateCache initialization. Maintenance will not start.");
87+
}
88+
}
89+
90+
/**
91+
* Starts the scheduled maintenance for the cache. Without this thread calling cleanUp(), the Guava cache only
92+
* performs maintenance operations (such as evicting expired entries) when the cache is accessed. This
93+
* ensures that the cache is also cleaned up based on the configured expiry time.
94+
* @see <a href="https://github.com/google/guava/wiki/cachesexplained#timed-eviction"> Guava Cache Guide</a>
95+
* @param cacheInstance cache on which to call cleanUp()
96+
*/
97+
private void startMaintenance(Cache<String, QuantizationState> cacheInstance) {
98+
if (maintenanceTask != null) {
99+
maintenanceTask.cancel();
100+
}
101+
102+
Runnable cleanUp = () -> {
103+
try {
104+
cacheInstance.cleanUp();
105+
} catch (Exception e) {
106+
log.error("Error cleaning up cache", e);
107+
}
108+
};
109+
110+
TimeValue interval = KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES);
111+
112+
maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
74113
}
75114

76115
synchronized void rebuildCache() {
@@ -129,4 +168,12 @@ private void updateEvictedDueToSizeAt() {
129168
public void clear() {
130169
cache.invalidateAll();
131170
}
171+
172+
@Override
173+
public void close() throws IOException {
174+
if (maintenanceTask != null) {
175+
maintenanceTask.cancel();
176+
}
177+
}
178+
132179
}

src/main/java/org/opensearch/knn/quantization/models/quantizationState/QuantizationStateCacheManager.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
import lombok.NoArgsConstructor;
1010
import org.opensearch.knn.index.codec.KNN990Codec.KNN990QuantizationStateReader;
1111

12+
import java.io.Closeable;
1213
import java.io.IOException;
1314

1415
@NoArgsConstructor(access = AccessLevel.PRIVATE)
15-
public final class QuantizationStateCacheManager {
16+
public final class QuantizationStateCacheManager implements Closeable {
1617

1718
private static volatile QuantizationStateCacheManager instance;
1819

@@ -79,4 +80,9 @@ public void setMaxCacheSizeInKB(long maxCacheSizeInKB) {
7980
public void clear() {
8081
QuantizationStateCache.getInstance().clear();
8182
}
83+
84+
@Override
85+
public void close() throws IOException {
86+
QuantizationStateCache.getInstance().close();
87+
}
8288
}

src/test/java/org/opensearch/knn/KNNSingleNodeTestCase.java

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.core.xcontent.XContentBuilder;
3636
import org.opensearch.common.xcontent.XContentFactory;
3737
import org.opensearch.index.IndexService;
38+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
3839
import org.opensearch.plugins.Plugin;
3940
import org.opensearch.core.rest.RestStatus;
4041
import org.opensearch.test.OpenSearchSingleNodeTestCase;
@@ -86,6 +87,7 @@ protected boolean resetNodeAfterTest() {
8687
public void tearDown() throws Exception {
8788
NativeMemoryCacheManager.getInstance().invalidateAll();
8889
NativeMemoryCacheManager.getInstance().close();
90+
QuantizationStateCacheManager.getInstance().close();
8991
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance().close();
9092
NativeMemoryLoadStrategy.TrainingLoadStrategy.getInstance().close();
9193
NativeMemoryLoadStrategy.AnonymousLoadStrategy.getInstance().close();

src/test/java/org/opensearch/knn/KNNTestCase.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.opensearch.core.common.bytes.BytesReference;
2525
import org.opensearch.core.xcontent.XContentBuilder;
2626
import org.opensearch.common.xcontent.XContentHelper;
27+
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
2728
import org.opensearch.test.OpenSearchTestCase;
2829

30+
import java.io.IOException;
2931
import java.util.Collections;
3032
import java.util.HashSet;
3133
import java.util.Map;
@@ -73,7 +75,7 @@ protected boolean enableWarningsCheck() {
7375
return false;
7476
}
7577

76-
public void resetState() {
78+
public void resetState() throws IOException {
7779
// Reset all of the counters
7880
for (KNNCounter knnCounter : KNNCounter.values()) {
7981
knnCounter.set(0L);
@@ -83,6 +85,7 @@ public void resetState() {
8385
// Clean up the cache
8486
NativeMemoryCacheManager.getInstance().invalidateAll();
8587
NativeMemoryCacheManager.getInstance().close();
88+
QuantizationStateCacheManager.getInstance().close();
8689
}
8790

8891
private void initKNNSettings() {

src/test/java/org/opensearch/knn/index/memory/NativeMemoryCacheManagerTests.java

+36
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
package org.opensearch.knn.index.memory;
1313

1414
import com.google.common.cache.CacheStats;
15+
import org.junit.After;
16+
import org.junit.Before;
1517
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1618
import org.opensearch.common.settings.Settings;
1719
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
@@ -20,6 +22,8 @@
2022
import org.opensearch.knn.plugin.KNNPlugin;
2123
import org.opensearch.plugins.Plugin;
2224
import org.opensearch.test.OpenSearchSingleNodeTestCase;
25+
import org.opensearch.threadpool.Scheduler.Cancellable;
26+
import org.opensearch.threadpool.ThreadPool;
2327

2428
import java.io.IOException;
2529
import java.util.Collection;
@@ -34,13 +38,29 @@
3438

3539
public class NativeMemoryCacheManagerTests extends OpenSearchSingleNodeTestCase {
3640

41+
private ThreadPool threadPool;
42+
43+
@Before
44+
public void setUp() throws Exception {
45+
super.setUp();
46+
threadPool = new ThreadPool(Settings.builder().put("node.name", "NativeMemoryCacheManagerTests").build());
47+
NativeMemoryCacheManager.setThreadPool(threadPool);
48+
}
49+
50+
@After
51+
public void shutdown() throws Exception {
52+
super.tearDown();
53+
terminate(threadPool);
54+
}
55+
3756
@Override
3857
public void tearDown() throws Exception {
3958
// Clear out persistent metadata
4059
ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest();
4160
Settings circuitBreakerSettings = Settings.builder().putNull(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED).build();
4261
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
4362
client().admin().cluster().updateSettings(clusterUpdateSettingsRequest).get();
63+
NativeMemoryCacheManager.getInstance().close();
4464
super.tearDown();
4565
}
4666

@@ -51,6 +71,8 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
5171

5272
public void testRebuildCache() throws ExecutionException, InterruptedException {
5373
NativeMemoryCacheManager nativeMemoryCacheManager = new NativeMemoryCacheManager();
74+
Cancellable task1 = nativeMemoryCacheManager.getMaintenanceTask();
75+
assertNotNull(task1);
5476

5577
// Put entry in cache and check that the weight matches
5678
int size = 10;
@@ -65,6 +87,9 @@ public void testRebuildCache() throws ExecutionException, InterruptedException {
6587
// Sleep for a second or two so that the executor can invalidate all entries
6688
Thread.sleep(2000);
6789

90+
assertTrue(task1.isCancelled());
91+
assertNotNull(nativeMemoryCacheManager.getMaintenanceTask());
92+
6893
assertEquals(0, nativeMemoryCacheManager.getCacheSizeInKilobytes());
6994
nativeMemoryCacheManager.close();
7095
}
@@ -378,6 +403,7 @@ public void testCacheCapacity() {
378403

379404
nativeMemoryCacheManager.setCacheCapacityReached(false);
380405
assertFalse(nativeMemoryCacheManager.isCacheCapacityReached());
406+
nativeMemoryCacheManager.close();
381407
}
382408

383409
public void testGetIndicesCacheStats() throws IOException, ExecutionException {
@@ -464,6 +490,16 @@ public void testGetIndicesCacheStats() throws IOException, ExecutionException {
464490
nativeMemoryCacheManager.close();
465491
}
466492

493+
public void testMaintenanceScheduled() {
494+
NativeMemoryCacheManager nativeMemoryCacheManager = new NativeMemoryCacheManager();
495+
Cancellable maintenanceTask = nativeMemoryCacheManager.getMaintenanceTask();
496+
497+
assertNotNull(maintenanceTask);
498+
499+
nativeMemoryCacheManager.close();
500+
assertTrue(maintenanceTask.isCancelled());
501+
}
502+
467503
private static class TestNativeMemoryAllocation implements NativeMemoryAllocation {
468504

469505
int size;

0 commit comments

Comments
 (0)