Skip to content

Commit 3eaf8b3

Browse files
committed
Remove usage of cluster level setting for circuit breaker
Simplification of circuit breaker management. Before, we were periodically updating the circuit breaker cluster setting to have a global circuit breaker. However, this is inconvenient and not actually useful. This change removes that logic and only trips based on node level circuit breaker. For knn stats, in order to fetch the cluster level circuit breaker, a transport call is made to check all of the nodes. This isnt super efficient but its made for stats calls so its not on the critical path. Signed-off-by: John Mazanec <jmazane@amazon.com>
1 parent 5873add commit 3eaf8b3

20 files changed

+442
-179
lines changed

qa/rolling-upgrade/src/test/java/org/opensearch/knn/bwc/StatsIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class StatsIT extends AbstractRollingUpgradeTestCase {
2121
@Before
2222
public void setUp() throws Exception {
2323
super.setUp();
24-
this.knnStats = new KNNStats();
24+
this.knnStats = new KNNStats(null);
2525
}
2626

2727
// Validate if all the KNN Stats metrics from old version are present in new version

src/main/java/org/opensearch/knn/index/KNNCircuitBreaker.java

+21-67
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,22 @@
55

66
package org.opensearch.knn.index;
77

8+
import lombok.Getter;
89
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
9-
import org.opensearch.knn.plugin.stats.StatNames;
10-
import org.opensearch.knn.plugin.transport.KNNStatsAction;
11-
import org.opensearch.knn.plugin.transport.KNNStatsNodeResponse;
12-
import org.opensearch.knn.plugin.transport.KNNStatsRequest;
13-
import org.opensearch.knn.plugin.transport.KNNStatsResponse;
14-
import org.apache.logging.log4j.LogManager;
15-
import org.apache.logging.log4j.Logger;
16-
import org.opensearch.transport.client.Client;
17-
import org.opensearch.cluster.service.ClusterService;
1810
import org.opensearch.common.unit.TimeValue;
1911
import org.opensearch.threadpool.ThreadPool;
2012

21-
import java.util.ArrayList;
22-
import java.util.List;
23-
2413
/**
2514
* Runs the circuit breaker logic and updates the settings
2615
*/
16+
@Getter
2717
public class KNNCircuitBreaker {
28-
private static Logger logger = LogManager.getLogger(KNNCircuitBreaker.class);
2918
public static final String KNN_CIRCUIT_BREAKER_TIER = "knn_cb_tier";
3019
public static int CB_TIME_INTERVAL = 2 * 60; // seconds
3120

3221
private static KNNCircuitBreaker INSTANCE;
33-
private ThreadPool threadPool;
34-
private ClusterService clusterService;
35-
private Client client;
22+
23+
private boolean isTripped = false;
3624

3725
private KNNCircuitBreaker() {}
3826

@@ -44,68 +32,34 @@ public static synchronized KNNCircuitBreaker getInstance() {
4432
}
4533

4634
/**
47-
* SetInstance of Circuit Breaker
35+
* Initialize the circuit breaker
4836
*
49-
* @param instance KNNCircuitBreaker instance
37+
* @param threadPool ThreadPool instance
5038
*/
51-
public static synchronized void setInstance(KNNCircuitBreaker instance) {
52-
INSTANCE = instance;
53-
}
54-
55-
public void initialize(ThreadPool threadPool, ClusterService clusterService, Client client) {
56-
this.threadPool = threadPool;
57-
this.clusterService = clusterService;
58-
this.client = client;
39+
public void initialize(ThreadPool threadPool) {
5940
NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
6041
Runnable runnable = () -> {
61-
if (nativeMemoryCacheManager.isCacheCapacityReached() && clusterService.localNode().isDataNode()) {
42+
if (isTripped) {
6243
long currentSizeKiloBytes = nativeMemoryCacheManager.getCacheSizeInKilobytes();
6344
long circuitBreakerLimitSizeKiloBytes = KNNSettings.state().getCircuitBreakerLimit().getKb();
6445
long circuitBreakerUnsetSizeKiloBytes = (long) ((KNNSettings.getCircuitBreakerUnsetPercentage() / 100)
6546
* circuitBreakerLimitSizeKiloBytes);
66-
/**
67-
* Unset capacityReached flag if currentSizeBytes is less than circuitBreakerUnsetSizeBytes
68-
*/
69-
if (currentSizeKiloBytes <= circuitBreakerUnsetSizeKiloBytes) {
70-
nativeMemoryCacheManager.setCacheCapacityReached(false);
71-
}
72-
}
73-
74-
// Leader node untriggers CB if all nodes have not reached their max capacity
75-
if (KNNSettings.isCircuitBreakerTriggered() && clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
76-
KNNStatsRequest knnStatsRequest = new KNNStatsRequest();
77-
knnStatsRequest.addStat(StatNames.CACHE_CAPACITY_REACHED.getName());
78-
knnStatsRequest.timeout(new TimeValue(1000 * 10)); // 10 second timeout
7947

80-
try {
81-
KNNStatsResponse knnStatsResponse = client.execute(KNNStatsAction.INSTANCE, knnStatsRequest).get();
82-
List<KNNStatsNodeResponse> nodeResponses = knnStatsResponse.getNodes();
83-
84-
List<String> nodesAtMaxCapacity = new ArrayList<>();
85-
for (KNNStatsNodeResponse nodeResponse : nodeResponses) {
86-
if ((Boolean) nodeResponse.getStatsMap().get(StatNames.CACHE_CAPACITY_REACHED.getName())) {
87-
nodesAtMaxCapacity.add(nodeResponse.getNode().getId());
88-
}
89-
}
90-
91-
if (!nodesAtMaxCapacity.isEmpty()) {
92-
logger.info(
93-
"[KNN] knn.circuit_breaker.triggered stays set. Nodes at max cache capacity: "
94-
+ String.join(",", nodesAtMaxCapacity)
95-
+ "."
96-
);
97-
} else {
98-
logger.info(
99-
"[KNN] Cache capacity below 75% of the circuit breaker limit for all nodes."
100-
+ " Unsetting knn.circuit_breaker.triggered flag."
101-
);
102-
KNNSettings.state().updateCircuitBreakerSettings(false);
103-
}
104-
} catch (Exception e) {
105-
logger.error("[KNN] Exception getting stats: " + e);
48+
// Unset capacityReached flag if currentSizeBytes is less than circuitBreakerUnsetSizeBytes
49+
if (currentSizeKiloBytes <= circuitBreakerUnsetSizeKiloBytes) {
50+
setTripped(false);
10651
}
10752
}
10853
};
109-
this.threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC);
54+
threadPool.scheduleWithFixedDelay(runnable, TimeValue.timeValueSeconds(CB_TIME_INTERVAL), ThreadPool.Names.GENERIC);
55+
}
56+
57+
/**
58+
* Set the circuit breaker flag
59+
*
60+
* @param isTripped true if circuit breaker is tripped, false otherwise
61+
*/
62+
public synchronized void setTripped(boolean isTripped) {
63+
this.isTripped = isTripped;
11064
}
11165
}

src/main/java/org/opensearch/knn/index/KNNSettings.java

+4-45
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.opensearch.OpenSearchParseException;
13-
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
14-
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
1513
import org.opensearch.cluster.metadata.IndexMetadata;
1614
import org.opensearch.cluster.service.ClusterService;
1715
import org.opensearch.common.Booleans;
1816
import org.opensearch.common.settings.Setting;
1917
import org.opensearch.common.settings.Settings;
2018
import org.opensearch.common.unit.TimeValue;
21-
import org.opensearch.core.action.ActionListener;
2219
import org.opensearch.core.common.unit.ByteSizeUnit;
2320
import org.opensearch.core.common.unit.ByteSizeValue;
2421
import org.opensearch.index.IndexModule;
@@ -28,7 +25,6 @@
2825
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
2926
import org.opensearch.monitor.jvm.JvmInfo;
3027
import org.opensearch.monitor.os.OsProbe;
31-
import org.opensearch.transport.client.Client;
3228

3329
import java.security.InvalidParameterException;
3430
import java.util.Arrays;
@@ -317,7 +313,8 @@ public class KNNSettings {
317313
KNN_CIRCUIT_BREAKER_TRIGGERED,
318314
false,
319315
NodeScope,
320-
Dynamic
316+
Dynamic,
317+
Setting.Property.Deprecated
321318
);
322319

323320
public static final Setting<Double> KNN_CIRCUIT_BREAKER_UNSET_PERCENTAGE_SETTING = Setting.doubleSetting(
@@ -473,8 +470,8 @@ public class KNNSettings {
473470
private final static Map<String, Setting<?>> FEATURE_FLAGS = getFeatureFlags().stream()
474471
.collect(toUnmodifiableMap(Setting::getKey, Function.identity()));
475472

473+
@Setter
476474
private ClusterService clusterService;
477-
private Client client;
478475
@Setter
479476
private Optional<String> nodeCbAttribute;
480477

@@ -638,10 +635,6 @@ public static boolean isKNNPluginEnabled() {
638635
return KNNSettings.state().getSettingValue(KNNSettings.KNN_PLUGIN_ENABLED);
639636
}
640637

641-
public static boolean isCircuitBreakerTriggered() {
642-
return KNNSettings.state().getSettingValue(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED);
643-
}
644-
645638
/**
646639
* Retrieves the node-specific circuit breaker limit based on the existing settings.
647640
*
@@ -806,8 +799,7 @@ public static boolean isShardLevelRescoringDisabledForDiskBasedVector(String ind
806799
.getAsBoolean(KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED, false);
807800
}
808801

809-
public void initialize(Client client, ClusterService clusterService) {
810-
this.client = client;
802+
public void initialize(ClusterService clusterService) {
811803
this.clusterService = clusterService;
812804
this.nodeCbAttribute = Optional.empty();
813805
setSettingsUpdateConsumers();
@@ -841,35 +833,6 @@ public static ByteSizeValue parseknnMemoryCircuitBreakerValue(String sValue, Byt
841833
}
842834
}
843835

844-
/**
845-
* Updates knn.circuit_breaker.triggered setting to true/false
846-
* @param flag true/false
847-
*/
848-
public synchronized void updateCircuitBreakerSettings(boolean flag) {
849-
ClusterUpdateSettingsRequest clusterUpdateSettingsRequest = new ClusterUpdateSettingsRequest();
850-
Settings circuitBreakerSettings = Settings.builder().put(KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED, flag).build();
851-
clusterUpdateSettingsRequest.persistentSettings(circuitBreakerSettings);
852-
client.admin().cluster().updateSettings(clusterUpdateSettingsRequest, new ActionListener<ClusterUpdateSettingsResponse>() {
853-
@Override
854-
public void onResponse(ClusterUpdateSettingsResponse clusterUpdateSettingsResponse) {
855-
logger.debug(
856-
"Cluster setting {}, acknowledged: {} ",
857-
clusterUpdateSettingsRequest.persistentSettings(),
858-
clusterUpdateSettingsResponse.isAcknowledged()
859-
);
860-
}
861-
862-
@Override
863-
public void onFailure(Exception e) {
864-
logger.info(
865-
"Exception while updating circuit breaker setting {} to {}",
866-
clusterUpdateSettingsRequest.persistentSettings(),
867-
e.getMessage()
868-
);
869-
}
870-
});
871-
}
872-
873836
public static ByteSizeValue getVectorStreamingMemoryLimit() {
874837
return KNNSettings.state().getSettingValue(KNN_VECTOR_STREAMING_MEMORY_LIMIT_IN_MB);
875838
}
@@ -888,10 +851,6 @@ public static int getEfSearchParam(String index) {
888851
);
889852
}
890853

891-
public void setClusterService(ClusterService clusterService) {
892-
this.clusterService = clusterService;
893-
}
894-
895854
static class SpaceTypeValidator implements Setting.Validator<String> {
896855

897856
@Override

src/main/java/org/opensearch/knn/index/mapper/KNNVectorFieldMapperUtil.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.lucene.util.BytesRef;
2121
import org.opensearch.Version;
2222
import org.opensearch.common.settings.Settings;
23+
import org.opensearch.knn.index.KNNCircuitBreaker;
2324
import org.opensearch.knn.index.KNNSettings;
2425
import org.opensearch.knn.index.KnnCircuitBreakerException;
2526
import org.opensearch.knn.index.SpaceType;
@@ -114,9 +115,9 @@ public static int getExpectedVectorLength(final KNNVectorFieldType knnVectorFiel
114115
* Validate if the circuit breaker is triggered
115116
*/
116117
static void validateIfCircuitBreakerIsNotTriggered() {
117-
if (KNNSettings.isCircuitBreakerTriggered()) {
118+
if (KNNCircuitBreaker.getInstance().isTripped()) {
118119
throw new KnnCircuitBreakerException(
119-
"Parsing the created knn vector fields prior to indexing has failed as the circuit breaker triggered. This indicates that the cluster is low on memory resources and cannot index more documents at the moment. Check _plugins/_knn/stats for the circuit breaker status."
120+
"Parsing the created knn vector fields prior to indexing has failed as the circuit breaker triggered. This indicates that the node is low on memory resources and cannot index more documents at the moment. Check _plugins/_knn/stats for the circuit breaker status."
120121
);
121122
}
122123
}

src/main/java/org/opensearch/knn/index/memory/NativeMemoryCacheManager.java

+2-20
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.common.unit.TimeValue;
2525
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
2626
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
27+
import org.opensearch.knn.index.KNNCircuitBreaker;
2728
import org.opensearch.knn.index.KNNSettings;
2829
import org.opensearch.knn.plugin.stats.StatNames;
2930
import org.opensearch.threadpool.ThreadPool;
@@ -413,24 +414,6 @@ public void invalidateAll() {
413414
cache.invalidateAll();
414415
}
415416

416-
/**
417-
* Returns whether or not the capacity of the cache has been reached
418-
*
419-
* @return Boolean of whether cache limit has been reached
420-
*/
421-
public Boolean isCacheCapacityReached() {
422-
return cacheCapacityReached.get();
423-
}
424-
425-
/**
426-
* Sets cache capacity reached
427-
*
428-
* @param value Boolean value to set cache Capacity Reached to
429-
*/
430-
public void setCacheCapacityReached(Boolean value) {
431-
cacheCapacityReached.set(value);
432-
}
433-
434417
/**
435418
* Get the stats of all of the OpenSearch indices currently loaded into the cache
436419
*
@@ -461,8 +444,7 @@ private void onRemoval(RemovalNotification<String, NativeMemoryAllocation> remov
461444
nativeMemoryAllocation.close();
462445

463446
if (RemovalCause.SIZE == removalNotification.getCause()) {
464-
KNNSettings.state().updateCircuitBreakerSettings(true);
465-
setCacheCapacityReached(true);
447+
KNNCircuitBreaker.getInstance().setTripped(true);
466448
}
467449

468450
logger.debug("[KNN] Cache evicted. Key {}, Reason: {}", removalNotification.getKey(), removalNotification.getCause());

src/main/java/org/opensearch/knn/plugin/KNNPlugin.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import org.opensearch.knn.plugin.transport.DeleteModelTransportAction;
6161
import org.opensearch.knn.plugin.transport.GetModelAction;
6262
import org.opensearch.knn.plugin.transport.GetModelTransportAction;
63+
import org.opensearch.knn.plugin.transport.KNNCircuitBreakerTrippedAction;
64+
import org.opensearch.knn.plugin.transport.KNNCircuitBreakerTrippedTransportAction;
6365
import org.opensearch.knn.plugin.transport.KNNStatsAction;
6466
import org.opensearch.knn.plugin.transport.KNNStatsTransportAction;
6567
import org.opensearch.knn.plugin.transport.KNNWarmupAction;
@@ -202,22 +204,22 @@ public Collection<Object> createComponents(
202204
VectorReader vectorReader = new VectorReader(client);
203205
NativeMemoryLoadStrategy.TrainingLoadStrategy.initialize(vectorReader);
204206

205-
KNNSettings.state().initialize(client, clusterService);
207+
KNNSettings.state().initialize(clusterService);
206208
KNNClusterUtil.instance().initialize(clusterService);
207209
ModelDao.OpenSearchKNNModelDao.initialize(client, clusterService, environment.settings());
208210
ModelCache.initialize(ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService);
209211
TrainingJobRunner.initialize(threadPool, ModelDao.OpenSearchKNNModelDao.getInstance());
210212
TrainingJobClusterStateListener.initialize(threadPool, ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService);
211213
QuantizationStateCache.setThreadPool(threadPool);
212214
NativeMemoryCacheManager.setThreadPool(threadPool);
213-
KNNCircuitBreaker.getInstance().initialize(threadPool, clusterService, client);
215+
KNNCircuitBreaker.getInstance().initialize(threadPool);
214216
KNNQueryBuilder.initialize(ModelDao.OpenSearchKNNModelDao.getInstance());
215217
KNNWeight.initialize(ModelDao.OpenSearchKNNModelDao.getInstance());
216218
TrainingModelRequest.initialize(ModelDao.OpenSearchKNNModelDao.getInstance(), clusterService);
217219

218220
clusterService.addListener(TrainingJobClusterStateListener.getInstance());
219221

220-
knnStats = new KNNStats();
222+
knnStats = new KNNStats(client);
221223
return ImmutableList.of(knnStats);
222224
}
223225

@@ -277,7 +279,8 @@ public List<RestHandler> getRestHandlers(
277279
new ActionHandler<>(RemoveModelFromCacheAction.INSTANCE, RemoveModelFromCacheTransportAction.class),
278280
new ActionHandler<>(SearchModelAction.INSTANCE, SearchModelTransportAction.class),
279281
new ActionHandler<>(UpdateModelGraveyardAction.INSTANCE, UpdateModelGraveyardTransportAction.class),
280-
new ActionHandler<>(ClearCacheAction.INSTANCE, ClearCacheTransportAction.class)
282+
new ActionHandler<>(ClearCacheAction.INSTANCE, ClearCacheTransportAction.class),
283+
new ActionHandler<>(KNNCircuitBreakerTrippedAction.INSTANCE, KNNCircuitBreakerTrippedTransportAction.class)
281284
);
282285
}
283286

0 commit comments

Comments
 (0)