Skip to content

Commit 98f337c

Browse files
committed
Fix memory overflow caused by cache behavior
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
1 parent f089b5b commit 98f337c

File tree

6 files changed

+176
-13
lines changed

6 files changed

+176
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
* Add script_fields context to KNNAllowlist [#1917] (https://github.com/opensearch-project/k-NN/pull/1917)
2727
* Fix graph merge stats size calculation [#1844](https://github.com/opensearch-project/k-NN/pull/1844)
2828
* Disallow a vector field to have an invalid character for a physical file name. [#1936](https://github.com/opensearch-project/k-NN/pull/1936)
29+
* Fix memory overflow caused by cache behavior [#2015](https://github.com/opensearch-project/k-NN/pull/2015)
2930
### Infrastructure
3031
* Parallelize make to reduce build time [#2006] (https://github.com/opensearch-project/k-NN/pull/2006)
3132
### Documentation

src/main/java/org/opensearch/knn/common/featureflags/KNNFeatureFlags.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@
77
package org.opensearch.knn.common.featureflags;
88

99
import com.google.common.annotations.VisibleForTesting;
10+
import com.google.common.collect.ImmutableList;
1011
import lombok.experimental.UtilityClass;
1112
import org.opensearch.common.settings.Setting;
1213
import org.opensearch.knn.index.KNNSettings;
1314

1415
import java.util.List;
15-
import java.util.stream.Collectors;
16-
import java.util.stream.Stream;
1716

1817
import static org.opensearch.common.settings.Setting.Property.Dynamic;
1918
import static org.opensearch.common.settings.Setting.Property.NodeScope;
@@ -26,21 +25,33 @@ public class KNNFeatureFlags {
2625

2726
// Feature flags
2827
private static final String KNN_LAUNCH_QUERY_REWRITE_ENABLED = "knn.feature.query.rewrite.enabled";
29-
private static final boolean KNN_LAUNCH_QUERY_REWRITE_ENABLED_DEFAULT = false;
28+
private static final String KNN_FORCE_EVICT_CACHE_ENABLED = "knn.feature.cache.force_evict.enabled";
3029

3130
@VisibleForTesting
3231
public static final Setting<Boolean> KNN_LAUNCH_QUERY_REWRITE_ENABLED_SETTING = Setting.boolSetting(
3332
KNN_LAUNCH_QUERY_REWRITE_ENABLED,
34-
KNN_LAUNCH_QUERY_REWRITE_ENABLED_DEFAULT,
33+
false,
34+
NodeScope,
35+
Dynamic
36+
);
37+
38+
@VisibleForTesting
39+
public static final Setting<Boolean> KNN_FORCE_EVICT_CACHE_ENABLED_SETTING = Setting.boolSetting(
40+
KNN_FORCE_EVICT_CACHE_ENABLED,
41+
false,
3542
NodeScope,
3643
Dynamic
3744
);
3845

3946
public static List<Setting<?>> getFeatureFlags() {
40-
return Stream.of(KNN_LAUNCH_QUERY_REWRITE_ENABLED_SETTING).collect(Collectors.toUnmodifiableList());
47+
return ImmutableList.of(KNN_LAUNCH_QUERY_REWRITE_ENABLED_SETTING, KNN_FORCE_EVICT_CACHE_ENABLED_SETTING);
4148
}
4249

4350
public static boolean isKnnQueryRewriteEnabled() {
4451
return Boolean.parseBoolean(KNNSettings.state().getSettingValue(KNN_LAUNCH_QUERY_REWRITE_ENABLED).toString());
4552
}
53+
54+
public static boolean isForceEvictCacheEnabled() {
55+
return Boolean.parseBoolean(KNNSettings.state().getSettingValue(KNN_FORCE_EVICT_CACHE_ENABLED).toString());
56+
}
4657
}

src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import lombok.Getter;
1515
import org.apache.lucene.index.LeafReaderContext;
16+
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
1617
import org.opensearch.knn.index.util.IndexUtil;
1718
import org.opensearch.knn.index.VectorDataType;
1819
import org.opensearch.knn.index.query.KNNWeight;
@@ -163,11 +164,19 @@ class IndexAllocation implements NativeMemoryAllocation {
163164

164165
@Override
165166
public void close() {
166-
executor.execute(() -> {
167+
Runnable onClose = () -> {
167168
writeLock();
168169
cleanup();
169170
writeUnlock();
170-
});
171+
};
172+
173+
// The close operation needs to be blocking to prevent overflow
174+
// This blocks any entry until the close has completed, preventing creation before close scenarios
175+
if (KNNFeatureFlags.isForceEvictCacheEnabled()) {
176+
onClose.run();
177+
} else {
178+
executor.execute(onClose);
179+
}
171180
}
172181

173182
private void cleanup() {

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+47-2
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.opensearch.common.unit.TimeValue;
2323
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
24+
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
2425
import org.opensearch.knn.index.KNNSettings;
2526
import org.opensearch.knn.plugin.stats.StatNames;
2627

2728
import java.io.Closeable;
29+
import java.util.Deque;
2830
import java.util.HashMap;
31+
import java.util.Iterator;
2932
import java.util.Map;
3033
import java.util.Optional;
34+
import java.util.concurrent.ConcurrentLinkedDeque;
3135
import java.util.concurrent.ExecutionException;
3236
import java.util.concurrent.ExecutorService;
3337
import java.util.concurrent.Executors;
@@ -45,6 +49,7 @@ public class NativeMemoryCacheManager implements Closeable {
4549
private static NativeMemoryCacheManager INSTANCE;
4650

4751
private Cache<String, NativeMemoryAllocation> cache;
52+
private Deque<String> accessRecencyQueue;
4853
private final ExecutorService executor;
4954
private AtomicBoolean cacheCapacityReached;
5055
private long maxWeight;
@@ -97,7 +102,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
97102
}
98103

99104
cacheCapacityReached = new AtomicBoolean(false);
100-
105+
accessRecencyQueue = new ConcurrentLinkedDeque<>();
101106
cache = cacheBuilder.build();
102107
}
103108

@@ -301,7 +306,47 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
301306
);
302307
}
303308

304-
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
309+
if (KNNFeatureFlags.isForceEvictCacheEnabled()) {
310+
// Utilizes a force eviction mechanism to free up memory before the entry can be added to the cache
311+
// In case of a cache hit, the operation just updates the locally maintained recency list
312+
// In case of a cache miss, least recently accessed entries are evicted in a blocking manner
313+
// before the new entry can be added to the cache.
314+
String key = nativeMemoryEntryContext.getKey();
315+
NativeMemoryAllocation result = cache.getIfPresent(key);
316+
317+
// Cache Hit
318+
if (result != null) {
319+
accessRecencyQueue.remove(key);
320+
accessRecencyQueue.addLast(key);
321+
return result;
322+
}
323+
324+
// Cache Miss
325+
// Evict before put
326+
synchronized (this) {
327+
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
328+
Iterator<String> lruIterator = accessRecencyQueue.iterator();
329+
while (lruIterator.hasNext()
330+
&& (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight)) {
331+
332+
String keyToRemove = lruIterator.next();
333+
NativeMemoryAllocation allocationToRemove = cache.getIfPresent(keyToRemove);
334+
if (allocationToRemove != null) {
335+
allocationToRemove.close();
336+
cache.invalidate(keyToRemove);
337+
lruIterator.remove();
338+
}
339+
}
340+
}
341+
342+
result = cache.get(key, nativeMemoryEntryContext::load);
343+
accessRecencyQueue.addLast(key);
344+
345+
return result;
346+
}
347+
} else {
348+
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
349+
}
305350
}
306351

307352
/**

src/test/java/org/opensearch/knn/common/featureflags/KNNFeatureFlagsTests.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import org.opensearch.knn.index.KNNSettings;
1212

1313
import static org.mockito.Mockito.when;
14+
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING;
1415
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_LAUNCH_QUERY_REWRITE_ENABLED_SETTING;
16+
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.isForceEvictCacheEnabled;
1517
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.isKnnQueryRewriteEnabled;
1618

1719
public class KNNFeatureFlagsTests extends KNNTestCase {
@@ -25,10 +27,17 @@ public void setUp() throws Exception {
2527
KNNSettings.state().setClusterService(clusterService);
2628
}
2729

28-
public void testIsFeatureEnabled() throws Exception {
30+
public void testIsKnnQueryRewriteEnabled() throws Exception {
2931
when(clusterSettings.get(KNN_LAUNCH_QUERY_REWRITE_ENABLED_SETTING)).thenReturn(false);
3032
assertFalse(isKnnQueryRewriteEnabled());
3133
when(clusterSettings.get(KNN_LAUNCH_QUERY_REWRITE_ENABLED_SETTING)).thenReturn(true);
3234
assertTrue(isKnnQueryRewriteEnabled());
3335
}
36+
37+
public void testIsForceEvictCacheEnabled() throws Exception {
38+
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(false);
39+
assertFalse(isForceEvictCacheEnabled());
40+
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true);
41+
assertTrue(isForceEvictCacheEnabled());
42+
}
3443
}

src/test/java/org/opensearch/knn/index/memory/NativeMemoryAllocationTests.java

+91-3
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,36 @@
1313

1414
import com.google.common.collect.ImmutableMap;
1515
import lombok.SneakyThrows;
16+
import org.junit.Before;
17+
import org.mockito.Mock;
18+
import org.opensearch.common.settings.ClusterSettings;
1619
import org.opensearch.knn.KNNTestCase;
1720
import org.opensearch.knn.TestUtils;
1821
import org.opensearch.knn.common.KNNConstants;
19-
import org.opensearch.knn.index.util.IndexUtil;
22+
import org.opensearch.knn.index.KNNSettings;
23+
import org.opensearch.knn.index.SpaceType;
2024
import org.opensearch.knn.index.VectorDataType;
25+
import org.opensearch.knn.index.engine.KNNEngine;
26+
import org.opensearch.knn.index.util.IndexUtil;
2127
import org.opensearch.knn.jni.JNICommons;
2228
import org.opensearch.knn.jni.JNIService;
23-
import org.opensearch.knn.index.SpaceType;
24-
import org.opensearch.knn.index.engine.KNNEngine;
2529
import org.opensearch.watcher.FileWatcher;
2630
import org.opensearch.watcher.WatcherHandle;
2731

2832
import java.nio.file.Path;
2933
import java.util.Arrays;
3034
import java.util.Map;
35+
import java.util.concurrent.ExecutionException;
3136
import java.util.concurrent.ExecutorService;
3237
import java.util.concurrent.Executors;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicReference;
3341

3442
import static org.mockito.Mockito.doNothing;
3543
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.when;
45+
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING;
3646

3747
public class NativeMemoryAllocationTests extends KNNTestCase {
3848

@@ -41,6 +51,19 @@ public class NativeMemoryAllocationTests extends KNNTestCase {
4151
private int testLockValue3;
4252
private int testLockValue4;
4353

54+
@Mock
55+
ClusterSettings clusterSettings;
56+
57+
@Before
58+
@Override
59+
public void setUp() throws Exception {
60+
super.setUp();
61+
clusterSettings = mock(ClusterSettings.class);
62+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
63+
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(false);
64+
KNNSettings.state().setClusterService(clusterService);
65+
}
66+
4467
public void testIndexAllocation_close() throws InterruptedException {
4568
// Create basic nmslib HNSW index
4669
Path dir = createTempDir();
@@ -207,6 +230,71 @@ public void testIndexAllocation_readLock() throws InterruptedException {
207230
assertEquals(finalValue, testLockValue1);
208231
}
209232

233+
public void testIndexAllocation_closeDefault() {
234+
WatcherHandle<FileWatcher> watcherHandle = (WatcherHandle<FileWatcher>) mock(WatcherHandle.class);
235+
ExecutorService executorService = Executors.newFixedThreadPool(2);
236+
AtomicReference<Exception> expectedException = new AtomicReference<>();
237+
238+
// Executor based non-blocking close
239+
NativeMemoryAllocation.IndexAllocation nonBlockingIndexAllocation = new NativeMemoryAllocation.IndexAllocation(
240+
mock(ExecutorService.class),
241+
0,
242+
0,
243+
null,
244+
"test",
245+
"test",
246+
watcherHandle
247+
);
248+
249+
executorService.submit(nonBlockingIndexAllocation::readLock);
250+
Future<?> closingThread = executorService.submit(nonBlockingIndexAllocation::close);
251+
try {
252+
closingThread.get();
253+
} catch (Exception ex) {
254+
expectedException.set(ex);
255+
}
256+
assertNull(expectedException.get());
257+
expectedException.set(null);
258+
executorService.shutdown();
259+
}
260+
261+
public void testIndexAllocation_closeBlocking() throws InterruptedException, ExecutionException {
262+
WatcherHandle<FileWatcher> watcherHandle = (WatcherHandle<FileWatcher>) mock(WatcherHandle.class);
263+
ExecutorService executorService = Executors.newFixedThreadPool(2);
264+
AtomicReference<Exception> expectedException = new AtomicReference<>();
265+
266+
// Blocking close
267+
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true);
268+
NativeMemoryAllocation.IndexAllocation blockingIndexAllocation = new NativeMemoryAllocation.IndexAllocation(
269+
mock(ExecutorService.class),
270+
0,
271+
0,
272+
null,
273+
"test",
274+
"test",
275+
watcherHandle
276+
);
277+
278+
executorService.submit(blockingIndexAllocation::readLock);
279+
Future<?> closingThread = executorService.submit(blockingIndexAllocation::close);
280+
281+
// Check if thread is currently blocked
282+
try {
283+
closingThread.get(5, TimeUnit.SECONDS);
284+
} catch (Exception e) {
285+
expectedException.set(e);
286+
}
287+
288+
assertNotNull(expectedException.get());
289+
290+
executorService.submit(blockingIndexAllocation::readUnlock);
291+
closingThread.get();
292+
293+
// Waits until close
294+
assertTrue(blockingIndexAllocation.isClosed());
295+
executorService.shutdown();
296+
}
297+
210298
public void testIndexAllocation_writeLock() throws InterruptedException {
211299
// To test the writeLock, we first grab the writeLock in the main thread. Then we start another thread that
212300
// grabs the readLock and asserts testLockValue2 has been updated. Next in the main thread, we update the value

0 commit comments

Comments
 (0)