Skip to content

Commit 4cf10b8

Browse files
committed
Remove CB transport action
Signed-off-by: John Mazanec <jmazane@amazon.com>
1 parent 0d753df commit 4cf10b8

17 files changed

+190
-431
lines changed

qa/rolling-upgrade/src/test/java/org/opensearch/knn/bwc/StatsIT.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import org.apache.hc.core5.http.io.entity.EntityUtils;
99
import org.junit.Before;
10-
import org.opensearch.Version;
1110
import org.opensearch.client.Response;
1211
import org.opensearch.client.ResponseException;
1312
import org.opensearch.knn.plugin.stats.KNNStats;
@@ -22,7 +21,7 @@ public class StatsIT extends AbstractRollingUpgradeTestCase {
2221
@Before
2322
public void setUp() throws Exception {
2423
super.setUp();
25-
this.knnStats = new KNNStats(null, () -> Version.CURRENT);
24+
this.knnStats = new KNNStats();
2625
}
2726

2827
// Validate if all the KNN Stats metrics from old version are present in new version

src/main/java/org/opensearch/knn/plugin/KNNPlugin.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import org.opensearch.knn.plugin.transport.DeleteModelTransportAction;
5959
import org.opensearch.knn.plugin.transport.GetModelAction;
6060
import org.opensearch.knn.plugin.transport.GetModelTransportAction;
61-
import org.opensearch.knn.plugin.transport.KNNCircuitBreakerTrippedAction;
62-
import org.opensearch.knn.plugin.transport.KNNCircuitBreakerTrippedTransportAction;
6361
import org.opensearch.knn.plugin.transport.KNNStatsAction;
6462
import org.opensearch.knn.plugin.transport.KNNStatsTransportAction;
6563
import org.opensearch.knn.plugin.transport.KNNWarmupAction;
@@ -215,7 +213,7 @@ public Collection<Object> createComponents(
215213

216214
clusterService.addListener(TrainingJobClusterStateListener.getInstance());
217215

218-
KNNStats knnStats = new KNNStats(client, () -> clusterService.getClusterManagerService().getMinNodeVersion());
216+
KNNStats knnStats = new KNNStats();
219217
return ImmutableList.of(knnStats);
220218
}
221219

@@ -275,8 +273,7 @@ public List<RestHandler> getRestHandlers(
275273
new ActionHandler<>(RemoveModelFromCacheAction.INSTANCE, RemoveModelFromCacheTransportAction.class),
276274
new ActionHandler<>(SearchModelAction.INSTANCE, SearchModelTransportAction.class),
277275
new ActionHandler<>(UpdateModelGraveyardAction.INSTANCE, UpdateModelGraveyardTransportAction.class),
278-
new ActionHandler<>(ClearCacheAction.INSTANCE, ClearCacheTransportAction.class),
279-
new ActionHandler<>(KNNCircuitBreakerTrippedAction.INSTANCE, KNNCircuitBreakerTrippedTransportAction.class)
276+
new ActionHandler<>(ClearCacheAction.INSTANCE, ClearCacheTransportAction.class)
280277
);
281278
}
282279

src/main/java/org/opensearch/knn/plugin/stats/CircuitBreakerStat.java

+13-47
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,7 @@
55

66
package org.opensearch.knn.plugin.stats;
77

8-
import lombok.Getter;
9-
import org.opensearch.Version;
10-
import org.opensearch.core.action.ActionListener;
11-
import org.opensearch.knn.index.KNNSettings;
12-
import org.opensearch.knn.plugin.transport.KNNCircuitBreakerTrippedAction;
13-
import org.opensearch.knn.plugin.transport.KNNCircuitBreakerTrippedRequest;
14-
import org.opensearch.transport.client.Client;
15-
16-
import java.util.function.Supplier;
17-
18-
import static org.opensearch.knn.index.KNNSettings.KNN_CIRCUIT_BREAKER_TRIGGERED;
8+
import java.util.List;
199

2010
/**
2111
* Cluster stat that checks if the circuit breaker is enabled. For clusters on or after version 3.0, this stat will
@@ -24,49 +14,25 @@
2414
*/
2515
public class CircuitBreakerStat extends KNNStat<Boolean> {
2616

27-
private final Client client;
28-
private final Supplier<Version> minVersionSupplier;
29-
@Getter
30-
private Boolean cbTripped;
31-
32-
/**
33-
*
34-
* @param client Client used to execute transport call to get CB values of nodes
35-
* @param minVersionSupplier Minimum version supplier to provide minimum node version in the cluster
36-
*/
37-
public CircuitBreakerStat(Client client, Supplier<Version> minVersionSupplier) {
17+
public CircuitBreakerStat() {
3818
super(true, null);
39-
this.client = client;
40-
this.minVersionSupplier = minVersionSupplier;
41-
this.cbTripped = null;
4219
}
4320

4421
@Override
45-
public ActionListener<Void> setupContext(ActionListener<Void> actionListener) {
46-
// If there are any nodes in the cluster before 3.0, then we need to fall back to checking the CB via cluster
47-
// setting
48-
if (minVersionSupplier.get().before(Version.V_3_0_0)) {
49-
return ActionListener.wrap(voidResponse -> {
50-
cbTripped = KNNSettings.state().getSettingValue(KNN_CIRCUIT_BREAKER_TRIGGERED);
51-
actionListener.onResponse(voidResponse);
52-
}, actionListener::onFailure);
53-
}
54-
return ActionListener.wrap(
55-
voidResponse -> client.execute(
56-
KNNCircuitBreakerTrippedAction.INSTANCE,
57-
new KNNCircuitBreakerTrippedRequest(),
58-
ActionListener.wrap(knnCircuitBreakerTrippedResponse -> {
59-
cbTripped = knnCircuitBreakerTrippedResponse.isTripped();
60-
actionListener.onResponse(voidResponse);
61-
}, actionListener::onFailure)
62-
),
63-
actionListener::onFailure
64-
);
22+
public List<String> dependentNodeStats() {
23+
return List.of(StatNames.CACHE_CAPACITY_REACHED.getName());
6524
}
6625

6726
@Override
6827
public Boolean getValue() {
69-
assert cbTripped != null;
70-
return cbTripped;
28+
return false;
29+
}
30+
31+
@Override
32+
public Boolean getValue(KNNNodeStatAggregation aggregation) {
33+
if (aggregation == null) {
34+
return false;
35+
}
36+
return aggregation.isClusterLevelCircuitBreakerTripped();
7137
}
7238
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.plugin.stats;
7+
8+
import lombok.Getter;
9+
import org.opensearch.knn.plugin.transport.KNNStatsNodeResponse;
10+
11+
import java.util.List;
12+
13+
/**
14+
* Class contains aggregations of node stats that can be used as cluster stats. For instance, for the circuit breaker
15+
* stat, we want to check if any nodes in the cluster have their circuit breaker tripped. This is reported as the
16+
* cache_capacity_reached stat. We need to perform an aggregation on this stat to check if the circuit breaker is
17+
* tripped for the cluster. We cannot rely on the node stats returned for this because some nodes may be filtered out
18+
*/
19+
public class KNNNodeStatAggregation {
20+
@Getter
21+
private final boolean isClusterLevelCircuitBreakerTripped;
22+
23+
/**
24+
* From a set of node responses, create the aggregate stats
25+
*
26+
* @param nodeResponses
27+
*/
28+
public KNNNodeStatAggregation(List<KNNStatsNodeResponse> nodeResponses) {
29+
this.isClusterLevelCircuitBreakerTripped = nodeResponses.stream().anyMatch(r -> {
30+
if (r == null) {
31+
return false;
32+
}
33+
34+
if (r.getStatsMap() == null) {
35+
return false;
36+
}
37+
38+
if (r.getStatsMap().containsKey(StatNames.CACHE_CAPACITY_REACHED.getName()) == false) {
39+
return false;
40+
}
41+
42+
return (boolean) r.getStatsMap().get(StatNames.CACHE_CAPACITY_REACHED.getName());
43+
});
44+
}
45+
}

src/main/java/org/opensearch/knn/plugin/stats/KNNStat.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
package org.opensearch.knn.plugin.stats;
77

88
import lombok.Getter;
9-
import org.opensearch.core.action.ActionListener;
109

10+
import java.util.Collections;
11+
import java.util.List;
1112
import java.util.function.Supplier;
1213

1314
/**
@@ -30,13 +31,13 @@ public KNNStat(Boolean isClusterLevel, Supplier<T> supplier) {
3031
}
3132

3233
/**
33-
* Allows stats to set context via asynchronous calls. This should only be used for cluster level stats.
34+
* Allows a cluster stat to depend on node stats. This should only be set for cluster stats and should only return
35+
* node stats.
3436
*
35-
* @param actionListener listener to call once the context is setup. If no async calls are made, then do nothing.
36-
* @return listener that will execute setup on response.
37+
* @return list of dependent node stat names. Null if none
3738
*/
38-
public ActionListener<Void> setupContext(ActionListener<Void> actionListener) {
39-
return actionListener;
39+
public List<String> dependentNodeStats() {
40+
return Collections.emptyList();
4041
}
4142

4243
/**
@@ -47,4 +48,14 @@ public ActionListener<Void> setupContext(ActionListener<Void> actionListener) {
4748
public T getValue() {
4849
return supplier.get();
4950
}
51+
52+
/**
53+
* Get the value of the statistic potentially using the {@link KNNNodeStatAggregation}
54+
*
55+
* @param aggregation that can be used for cluster stats
56+
* @return value of the stat
57+
*/
58+
public T getValue(KNNNodeStatAggregation aggregation) {
59+
return supplier.get();
60+
}
5061
}

src/main/java/org/opensearch/knn/plugin/stats/KNNStats.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import com.google.common.cache.CacheStats;
99
import com.google.common.collect.ImmutableMap;
10-
import org.opensearch.Version;
1110
import org.opensearch.knn.common.KNNConstants;
1211
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
1312
import org.opensearch.knn.index.engine.KNNEngine;
@@ -21,7 +20,6 @@
2120
import org.opensearch.knn.plugin.stats.suppliers.ModelIndexStatusSupplier;
2221
import org.opensearch.knn.plugin.stats.suppliers.ModelIndexingDegradingSupplier;
2322
import org.opensearch.knn.plugin.stats.suppliers.NativeMemoryCacheManagerSupplier;
24-
import org.opensearch.transport.client.Client;
2523

2624
import java.time.temporal.ChronoUnit;
2725
import java.util.HashMap;
@@ -33,16 +31,12 @@
3331
*/
3432
public class KNNStats {
3533

36-
private final Client client;
37-
private final Supplier<Version> minVersionSupplier;
3834
private final Map<String, KNNStat<?>> knnStats;
3935

4036
/**
4137
* Constructor
4238
*/
43-
public KNNStats(Client client, Supplier<Version> minVersionSupplier) {
44-
this.client = client;
45-
this.minVersionSupplier = minVersionSupplier;
39+
public KNNStats() {
4640
this.knnStats = buildStatsMap();
4741
}
4842

@@ -151,7 +145,7 @@ private void addNativeMemoryStats(ImmutableMap.Builder<String, KNNStat<?>> build
151145
.put(StatNames.GRAPH_QUERY_REQUESTS.getName(), new KNNStat<>(false, new KNNCounterSupplier(KNNCounter.GRAPH_QUERY_REQUESTS)))
152146
.put(StatNames.GRAPH_INDEX_ERRORS.getName(), new KNNStat<>(false, new KNNCounterSupplier(KNNCounter.GRAPH_INDEX_ERRORS)))
153147
.put(StatNames.GRAPH_INDEX_REQUESTS.getName(), new KNNStat<>(false, new KNNCounterSupplier(KNNCounter.GRAPH_INDEX_REQUESTS)))
154-
.put(StatNames.CIRCUIT_BREAKER_TRIGGERED.getName(), new CircuitBreakerStat(client, minVersionSupplier));
148+
.put(StatNames.CIRCUIT_BREAKER_TRIGGERED.getName(), new CircuitBreakerStat());
155149
}
156150

157151
private void addEngineStats(ImmutableMap.Builder<String, KNNStat<?>> builder) {

src/main/java/org/opensearch/knn/plugin/transport/KNNCircuitBreakerTrippedAction.java

-37
This file was deleted.

src/main/java/org/opensearch/knn/plugin/transport/KNNCircuitBreakerTrippedNodeRequest.java

-46
This file was deleted.

0 commit comments

Comments
 (0)