Skip to content

Commit 6547a3c

Browse files
kotwanikunalgithub-actions[bot]
authored andcommitted
Fix memory overflow caused by cache behavior (#2015) (#2032)
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com> (cherry picked from commit d4af93e) (cherry picked from commit 5858d1c)
1 parent 86398a3 commit 6547a3c

File tree

7 files changed

+252
-10
lines changed

7 files changed

+252
-10
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2929
* Fix graph merge stats size calculation [#1844](https://github.com/opensearch-project/k-NN/pull/1844)
3030
* Disallow a vector field to have an invalid character for a physical file name. [#1936](https://github.com/opensearch-project/k-NN/pull/1936)
3131
* Add script_fields context to KNNAllowlist [#1917] (https://github.com/opensearch-project/k-NN/pull/1917)
32+
* Fix memory overflow caused by cache behavior [#2015](https://github.com/opensearch-project/k-NN/pull/2015)
3233
### Infrastructure
3334
* Parallelize make to reduce build time [#2006] (https://github.com/opensearch-project/k-NN/pull/2006)
3435
### Documentation
@@ -45,4 +46,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4546
* Restructure mappers to better handle null cases and avoid branching in parsing [#1939](https://github.com/opensearch-project/k-NN/pull/1939)
4647
* Added Quantization Framework and implemented 1Bit and multibit quantizer[#1889](https://github.com/opensearch-project/k-NN/issues/1889)
4748
* Encapsulate dimension, vector data type validation/processing inside Library [#1957](https://github.com/opensearch-project/k-NN/pull/1957)
48-
* Add quantization state cache [#1960](https://github.com/opensearch-project/k-NN/pull/1960)
49+
* Add quantization state cache [#1960](https://github.com/opensearch-project/k-NN/pull/1960)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
*/
6+
7+
package org.opensearch.knn.common.featureflags;
8+
9+
import com.google.common.annotations.VisibleForTesting;
10+
import com.google.common.collect.ImmutableList;
11+
import lombok.experimental.UtilityClass;
12+
import org.opensearch.common.Booleans;
13+
import org.opensearch.common.settings.Setting;
14+
import org.opensearch.knn.index.KNNSettings;
15+
16+
import java.util.List;
17+
18+
import static org.opensearch.common.settings.Setting.Property.Dynamic;
19+
import static org.opensearch.common.settings.Setting.Property.NodeScope;
20+
21+
/**
22+
* Class to manage KNN feature flags
23+
*/
24+
@UtilityClass
25+
public class KNNFeatureFlags {
26+
27+
// Feature flags
28+
private static final String KNN_FORCE_EVICT_CACHE_ENABLED = "knn.feature.cache.force_evict.enabled";
29+
30+
@VisibleForTesting
31+
public static final Setting<Boolean> KNN_FORCE_EVICT_CACHE_ENABLED_SETTING = Setting.boolSetting(
32+
KNN_FORCE_EVICT_CACHE_ENABLED,
33+
false,
34+
NodeScope,
35+
Dynamic
36+
);
37+
38+
public static List<Setting<?>> getFeatureFlags() {
39+
return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING);
40+
}
41+
42+
/**
43+
* Checks if force evict for cache is enabled by executing a check against cluster settings
44+
* @return true if force evict setting is set to true
45+
*/
46+
public static boolean isForceEvictCacheEnabled() {
47+
return Booleans.parseBoolean(KNNSettings.state().getSettingValue(KNN_FORCE_EVICT_CACHE_ENABLED).toString(), false);
48+
}
49+
}

src/main/java/org/opensearch/knn/index/KNNSettings.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@
3434
import java.util.List;
3535
import java.util.Map;
3636
import java.util.Objects;
37+
import java.util.function.Function;
3738
import java.util.stream.Collectors;
3839
import java.util.stream.Stream;
3940

41+
import static java.util.stream.Collectors.toUnmodifiableMap;
4042
import static org.opensearch.common.settings.Setting.Property.Dynamic;
4143
import static org.opensearch.common.settings.Setting.Property.IndexScope;
4244
import static org.opensearch.common.settings.Setting.Property.NodeScope;
4345
import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
4446
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
47+
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.getFeatureFlags;
4548

4649
/**
4750
* This class defines
@@ -334,6 +337,9 @@ public class KNNSettings {
334337
}
335338
};
336339

340+
private final static Map<String, Setting<?>> FEATURE_FLAGS = getFeatureFlags().stream()
341+
.collect(toUnmodifiableMap(Setting::getKey, Function.identity()));
342+
337343
private ClusterService clusterService;
338344
private Client client;
339345

@@ -371,7 +377,7 @@ private void setSettingsUpdateConsumers() {
371377
);
372378

373379
NativeMemoryCacheManager.getInstance().rebuildCache(builder.build());
374-
}, dynamicCacheSettings.values().stream().collect(Collectors.toUnmodifiableList()));
380+
}, Stream.concat(dynamicCacheSettings.values().stream(), FEATURE_FLAGS.values().stream()).collect(Collectors.toUnmodifiableList()));
375381
clusterService.getClusterSettings().addSettingsUpdateConsumer(QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING, it -> {
376382
QuantizationStateCache.getInstance().setMaxCacheSizeInKB(it.getKb());
377383
QuantizationStateCache.getInstance().rebuildCache();
@@ -398,6 +404,10 @@ private Setting<?> getSetting(String key) {
398404
return dynamicCacheSettings.get(key);
399405
}
400406

407+
if (FEATURE_FLAGS.containsKey(key)) {
408+
return FEATURE_FLAGS.get(key);
409+
}
410+
401411
if (KNN_CIRCUIT_BREAKER_TRIGGERED.equals(key)) {
402412
return KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING;
403413
}
@@ -452,7 +462,8 @@ public List<Setting<?>> getSettings() {
452462
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
453463
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING
454464
);
455-
return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList());
465+
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
466+
.collect(Collectors.toList());
456467
}
457468

458469
public static boolean isKNNPluginEnabled() {

src/main/java/org/opensearch/knn/index/memory/NativeMemoryAllocation.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import lombok.Getter;
1515
import org.apache.lucene.index.LeafReaderContext;
16+
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
1617
import org.opensearch.knn.index.VectorDataType;
1718
import org.opensearch.knn.index.query.KNNWeight;
1819
import org.opensearch.knn.jni.JNIService;
@@ -161,11 +162,19 @@ class IndexAllocation implements NativeMemoryAllocation {
161162

162163
@Override
163164
public void close() {
164-
executor.execute(() -> {
165+
Runnable onClose = () -> {
165166
writeLock();
166167
cleanup();
167168
writeUnlock();
168-
});
169+
};
170+
171+
// The close operation needs to be blocking to prevent overflow
172+
// This blocks any entry until the close has completed, preventing creation before close scenarios
173+
if (KNNFeatureFlags.isForceEvictCacheEnabled()) {
174+
onClose.run();
175+
} else {
176+
executor.execute(onClose);
177+
}
169178
}
170179

171180
private void cleanup() {

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+52-2
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
import org.apache.logging.log4j.Logger;
2222
import org.opensearch.common.unit.TimeValue;
2323
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
24+
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
2425
import org.opensearch.knn.index.KNNSettings;
2526
import org.opensearch.knn.plugin.stats.StatNames;
2627

2728
import java.io.Closeable;
29+
import java.util.Deque;
2830
import java.util.HashMap;
31+
import java.util.Iterator;
2932
import java.util.Map;
3033
import java.util.Optional;
34+
import java.util.concurrent.ConcurrentLinkedDeque;
3135
import java.util.concurrent.ExecutionException;
3236
import java.util.concurrent.ExecutorService;
3337
import java.util.concurrent.Executors;
@@ -45,6 +49,7 @@ public class NativeMemoryCacheManager implements Closeable {
4549
private static NativeMemoryCacheManager INSTANCE;
4650

4751
private Cache<String, NativeMemoryAllocation> cache;
52+
private Deque<String> accessRecencyQueue;
4853
private final ExecutorService executor;
4954
private AtomicBoolean cacheCapacityReached;
5055
private long maxWeight;
@@ -97,7 +102,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
97102
}
98103

99104
cacheCapacityReached = new AtomicBoolean(false);
100-
105+
accessRecencyQueue = new ConcurrentLinkedDeque<>();
101106
cache = cacheBuilder.build();
102107
}
103108

@@ -301,7 +306,52 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
301306
);
302307
}
303308

304-
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
309+
if (KNNFeatureFlags.isForceEvictCacheEnabled()) {
310+
// Utilizes a force eviction mechanism to free up memory before the entry can be added to the cache
311+
// In case of a cache hit, the operation just updates the locally maintained recency list
312+
// In case of a cache miss, least recently accessed entries are evicted in a blocking manner
313+
// before the new entry can be added to the cache.
314+
String key = nativeMemoryEntryContext.getKey();
315+
NativeMemoryAllocation result = cache.getIfPresent(key);
316+
317+
// Cache Hit
318+
// In case of a cache hit, moving the item to the end of the recency queue adds
319+
// some overhead to the get operation. This can be optimized further to make this operation
320+
// as lightweight as possible. Multiple approaches and their outcomes were documented
321+
// before moving forward with the current solution.
322+
// The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680
323+
if (result != null) {
324+
accessRecencyQueue.remove(key);
325+
accessRecencyQueue.addLast(key);
326+
return result;
327+
}
328+
329+
// Cache Miss
330+
// Evict before put
331+
synchronized (this) {
332+
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
333+
Iterator<String> lruIterator = accessRecencyQueue.iterator();
334+
while (lruIterator.hasNext()
335+
&& (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight)) {
336+
337+
String keyToRemove = lruIterator.next();
338+
NativeMemoryAllocation allocationToRemove = cache.getIfPresent(keyToRemove);
339+
if (allocationToRemove != null) {
340+
allocationToRemove.close();
341+
cache.invalidate(keyToRemove);
342+
}
343+
lruIterator.remove();
344+
}
345+
}
346+
347+
result = cache.get(key, nativeMemoryEntryContext::load);
348+
accessRecencyQueue.addLast(key);
349+
350+
return result;
351+
}
352+
} else {
353+
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
354+
}
305355
}
306356

307357
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.common.featureflags;
7+
8+
import org.mockito.Mock;
9+
import org.opensearch.common.settings.ClusterSettings;
10+
import org.opensearch.knn.KNNTestCase;
11+
import org.opensearch.knn.index.KNNSettings;
12+
13+
import static org.mockito.Mockito.when;
14+
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING;
15+
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.isForceEvictCacheEnabled;
16+
17+
public class KNNFeatureFlagsTests extends KNNTestCase {
18+
19+
@Mock
20+
ClusterSettings clusterSettings;
21+
22+
public void setUp() throws Exception {
23+
super.setUp();
24+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
25+
KNNSettings.state().setClusterService(clusterService);
26+
}
27+
28+
public void testIsForceEvictCacheEnabled() throws Exception {
29+
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(false);
30+
assertFalse(isForceEvictCacheEnabled());
31+
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true);
32+
assertTrue(isForceEvictCacheEnabled());
33+
}
34+
}

0 commit comments

Comments
 (0)