Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory overflow caused by cache behavior #2015

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Add script_fields context to KNNAllowlist [#1917] (https://github.com/opensearch-project/k-NN/pull/1917)
* Fix graph merge stats size calculation [#1844](https://github.com/opensearch-project/k-NN/pull/1844)
* Disallow a vector field to have an invalid character for a physical file name. [#1936](https://github.com/opensearch-project/k-NN/pull/1936)
* Fix memory overflow caused by cache behavior [#2015](https://github.com/opensearch-project/k-NN/pull/2015)
### Infrastructure
* Parallelize make to reduce build time [#2006] (https://github.com/opensearch-project/k-NN/pull/2006)
### Documentation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.knn.common.featureflags;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import lombok.experimental.UtilityClass;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.Setting;
import org.opensearch.knn.index.KNNSettings;

import java.util.List;

import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.NodeScope;

/**
* Class to manage KNN feature flags
*/
@UtilityClass
public class KNNFeatureFlags {

// Feature flags
private static final String KNN_FORCE_EVICT_CACHE_ENABLED = "knn.feature.cache.force_evict.enabled";

@VisibleForTesting
public static final Setting<Boolean> KNN_FORCE_EVICT_CACHE_ENABLED_SETTING = Setting.boolSetting(
KNN_FORCE_EVICT_CACHE_ENABLED,
false,
NodeScope,
Dynamic
);

public static List<Setting<?>> getFeatureFlags() {
return ImmutableList.of(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING);
}

/**
* Checks if force evict for cache is enabled by executing a check against cluster settings
* @return true if force evict setting is set to true
*/
public static boolean isForceEvictCacheEnabled() {
return Booleans.parseBoolean(KNNSettings.state().getSettingValue(KNN_FORCE_EVICT_CACHE_ENABLED).toString(), false);
}
}
15 changes: 13 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableMap;
import static org.opensearch.common.settings.Setting.Property.Dynamic;
import static org.opensearch.common.settings.Setting.Property.IndexScope;
import static org.opensearch.common.settings.Setting.Property.NodeScope;
import static org.opensearch.common.unit.MemorySizeValue.parseBytesSizeValueOrHeapRatio;
import static org.opensearch.core.common.unit.ByteSizeValue.parseBytesSizeValue;
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.getFeatureFlags;

/**
* This class defines
Expand Down Expand Up @@ -351,6 +354,9 @@ public class KNNSettings {
}
};

private final static Map<String, Setting<?>> FEATURE_FLAGS = getFeatureFlags().stream()
.collect(toUnmodifiableMap(Setting::getKey, Function.identity()));

private ClusterService clusterService;
private Client client;

Expand Down Expand Up @@ -388,7 +394,7 @@ private void setSettingsUpdateConsumers() {
);

NativeMemoryCacheManager.getInstance().rebuildCache(builder.build());
}, dynamicCacheSettings.values().stream().collect(Collectors.toUnmodifiableList()));
}, Stream.concat(dynamicCacheSettings.values().stream(), FEATURE_FLAGS.values().stream()).collect(Collectors.toUnmodifiableList()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING, it -> {
QuantizationStateCache.getInstance().setMaxCacheSizeInKB(it.getKb());
QuantizationStateCache.getInstance().rebuildCache();
Expand All @@ -415,6 +421,10 @@ private Setting<?> getSetting(String key) {
return dynamicCacheSettings.get(key);
}

if (FEATURE_FLAGS.containsKey(key)) {
return FEATURE_FLAGS.get(key);
}

if (KNN_CIRCUIT_BREAKER_TRIGGERED.equals(key)) {
return KNN_CIRCUIT_BREAKER_TRIGGERED_SETTING;
}
Expand Down Expand Up @@ -474,7 +484,8 @@ public List<Setting<?>> getSettings() {
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING
);
return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList());
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
}

public static boolean isKNNPluginEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import lombok.Getter;
import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.query.KNNWeight;
import org.opensearch.knn.jni.JNIService;
Expand Down Expand Up @@ -161,11 +162,19 @@ class IndexAllocation implements NativeMemoryAllocation {

@Override
public void close() {
executor.execute(() -> {
Runnable onClose = () -> {
writeLock();
cleanup();
writeUnlock();
});
};

// The close operation needs to be blocking to prevent overflow
// This blocks any entry until the close has completed, preventing creation before close scenarios
if (KNNFeatureFlags.isForceEvictCacheEnabled()) {
onClose.run();
} else {
executor.execute(onClose);
}
}

private void cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.plugin.stats.StatNames;

import java.io.Closeable;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -45,6 +49,7 @@ public class NativeMemoryCacheManager implements Closeable {
private static NativeMemoryCacheManager INSTANCE;

private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;
Expand Down Expand Up @@ -97,7 +102,7 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
}

cacheCapacityReached = new AtomicBoolean(false);

accessRecencyQueue = new ConcurrentLinkedDeque<>();
cache = cacheBuilder.build();
}

Expand Down Expand Up @@ -301,7 +306,52 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
);
}

return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
if (KNNFeatureFlags.isForceEvictCacheEnabled()) {
// Utilizes a force eviction mechanism to free up memory before the entry can be added to the cache
// In case of a cache hit, the operation just updates the locally maintained recency list
// In case of a cache miss, least recently accessed entries are evicted in a blocking manner
// before the new entry can be added to the cache.
String key = nativeMemoryEntryContext.getKey();
NativeMemoryAllocation result = cache.getIfPresent(key);

// Cache Hit
// In case of a cache hit, moving the item to the end of the recency queue adds
// some overhead to the get operation. This can be optimized further to make this operation
// as lightweight as possible. Multiple approaches and their outcomes were documented
// before moving forward with the current solution.
// The details are outlined here: https://github.com/opensearch-project/k-NN/pull/2015#issuecomment-2327064680
if (result != null) {
accessRecencyQueue.remove(key);
accessRecencyQueue.addLast(key);
return result;
}

// Cache Miss
// Evict before put
synchronized (this) {
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
Iterator<String> lruIterator = accessRecencyQueue.iterator();
while (lruIterator.hasNext()
&& (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight)) {

String keyToRemove = lruIterator.next();
NativeMemoryAllocation allocationToRemove = cache.getIfPresent(keyToRemove);
if (allocationToRemove != null) {
allocationToRemove.close();
cache.invalidate(keyToRemove);
}
lruIterator.remove();
}
}

result = cache.get(key, nativeMemoryEntryContext::load);
accessRecencyQueue.addLast(key);

return result;
}
} else {
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.common.featureflags;

import org.mockito.Mock;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;

import static org.mockito.Mockito.when;
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.KNN_FORCE_EVICT_CACHE_ENABLED_SETTING;
import static org.opensearch.knn.common.featureflags.KNNFeatureFlags.isForceEvictCacheEnabled;

public class KNNFeatureFlagsTests extends KNNTestCase {

@Mock
ClusterSettings clusterSettings;

public void setUp() throws Exception {
super.setUp();
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
KNNSettings.state().setClusterService(clusterService);
}

public void testIsForceEvictCacheEnabled() throws Exception {
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(false);
assertFalse(isForceEvictCacheEnabled());
when(clusterSettings.get(KNN_FORCE_EVICT_CACHE_ENABLED_SETTING)).thenReturn(true);
assertTrue(isForceEvictCacheEnabled());
}
}
Loading
Loading