Skip to content

Commit 523ab42

Browse files
committed
Merge remote-tracking branch 'refs/remotes/origin/search-idle-metrics' into search-idle-metrics
2 parents 2ae676f + 0fa5f0f commit 523ab42

File tree

67 files changed

+4278
-603
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+4278
-603
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
112112
- [Tiered caching] Make IndicesRequestCache implementation configurable [EXPERIMENTAL] ([#12533](https://github.com/opensearch-project/OpenSearch/pull/12533))
113113
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
114114
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
115+
- [Metrics Framework] Adds support for asynchronous gauge metric type. ([#12642](https://github.com/opensearch-project/OpenSearch/issues/12642))
115116
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
116117
- [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542))
117118
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
@@ -145,6 +146,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
145146
### Changed
146147
- Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499))
147148
- Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643))
149+
- Mark fuzzy filter GA and remove experimental setting ([12631](https://github.com/opensearch-project/OpenSearch/pull/12631))
148150

149151
### Deprecated
150152

libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java

+9
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88

99
package org.opensearch.telemetry.metrics;
1010

11+
import org.opensearch.telemetry.metrics.tags.Tags;
12+
13+
import java.io.Closeable;
1114
import java.io.IOException;
15+
import java.util.function.Supplier;
1216

1317
/**
1418
* Default implementation for {@link MetricsRegistry}
@@ -39,6 +43,11 @@ public Histogram createHistogram(String name, String description, String unit) {
3943
return metricsTelemetry.createHistogram(name, description, unit);
4044
}
4145

46+
@Override
47+
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
48+
return metricsTelemetry.createGauge(name, description, unit, valueProvider, tags);
49+
}
50+
4251
@Override
4352
public void close() throws IOException {
4453
metricsTelemetry.close();

libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java

+16
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
package org.opensearch.telemetry.metrics;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.telemetry.metrics.tags.Tags;
1213

1314
import java.io.Closeable;
15+
import java.util.function.Supplier;
1416

1517
/**
1618
* MetricsRegistry helps in creating the metric instruments.
@@ -47,4 +49,18 @@ public interface MetricsRegistry extends Closeable {
4749
* @return histogram.
4850
*/
4951
Histogram createHistogram(String name, String description, String unit);
52+
53+
/**
54+
* Creates the Observable Gauge type of Metric. Where the value provider will be called at a certain frequency
55+
* to capture the value.
56+
*
57+
* @param name name of the observable gauge.
58+
* @param description any description about the metric.
59+
* @param unit unit of the metric.
60+
* @param valueProvider value provider.
61+
* @param tags attributes/dimensions of the metric.
62+
* @return closeable to dispose/close the Gauge metric.
63+
*/
64+
Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags);
65+
5066
}

libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java

+8
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
import org.opensearch.telemetry.metrics.Counter;
1313
import org.opensearch.telemetry.metrics.Histogram;
1414
import org.opensearch.telemetry.metrics.MetricsRegistry;
15+
import org.opensearch.telemetry.metrics.tags.Tags;
1516

17+
import java.io.Closeable;
1618
import java.io.IOException;
19+
import java.util.function.Supplier;
1720

1821
/**
1922
*No-op {@link MetricsRegistry}
@@ -44,6 +47,11 @@ public Histogram createHistogram(String name, String description, String unit) {
4447
return NoopHistogram.INSTANCE;
4548
}
4649

50+
@Override
51+
public Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags) {
52+
return () -> {};
53+
}
54+
4755
@Override
4856
public void close() throws IOException {
4957

libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88

99
package org.opensearch.telemetry.metrics;
1010

11+
import org.opensearch.telemetry.metrics.tags.Tags;
1112
import org.opensearch.test.OpenSearchTestCase;
1213

14+
import java.io.Closeable;
15+
import java.util.function.Supplier;
16+
1317
import static org.mockito.ArgumentMatchers.any;
1418
import static org.mockito.Mockito.mock;
1519
import static org.mockito.Mockito.when;
@@ -59,4 +63,20 @@ public void testHistogram() {
5963
assertSame(mockHistogram, histogram);
6064
}
6165

66+
@SuppressWarnings("unchecked")
67+
public void testGauge() {
68+
Closeable mockCloseable = mock(Closeable.class);
69+
when(
70+
defaultMeterRegistry.createGauge(any(String.class), any(String.class), any(String.class), any(Supplier.class), any(Tags.class))
71+
).thenReturn(mockCloseable);
72+
Closeable closeable = defaultMeterRegistry.createGauge(
73+
"org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testObservableGauge",
74+
"test observable gauge",
75+
"ms",
76+
() -> 1.0,
77+
Tags.EMPTY
78+
);
79+
assertSame(mockCloseable, closeable);
80+
}
81+
6282
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/*
10+
* Modifications Copyright OpenSearch Contributors. See
11+
* GitHub history for details.
12+
*/
13+
14+
package org.opensearch.cache.common.policy;
15+
16+
import org.opensearch.common.cache.policy.CachedQueryResult;
17+
import org.opensearch.common.unit.TimeValue;
18+
19+
import java.util.function.Function;
20+
import java.util.function.Predicate;
21+
22+
/**
23+
* A cache tier policy which accepts queries whose took time is greater than some threshold.
24+
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
25+
* The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed
26+
* to decide whether to admit the value.
27+
* @param <V> The type of data consumed by test().
28+
*/
29+
public class TookTimePolicy<V> implements Predicate<V> {
30+
/**
31+
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
32+
*/
33+
private final TimeValue threshold;
34+
35+
/**
36+
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
37+
*/
38+
private final Function<V, CachedQueryResult.PolicyValues> cachedResultParser;
39+
40+
/**
41+
* Constructs a took time policy.
42+
* @param threshold the threshold
43+
* @param cachedResultParser the function providing policy values
44+
*/
45+
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
46+
if (threshold.compareTo(TimeValue.ZERO) < 0) {
47+
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
48+
}
49+
this.threshold = threshold;
50+
this.cachedResultParser = cachedResultParser;
51+
}
52+
53+
/**
54+
* Check whether to admit data.
55+
* @param data the input argument
56+
* @return whether to admit the data
57+
*/
58+
public boolean test(V data) {
59+
long tookTimeNanos;
60+
try {
61+
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();
62+
} catch (Exception e) {
63+
// If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data
64+
return false;
65+
}
66+
67+
TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
68+
return tookTime.compareTo(threshold) >= 0;
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** A package for policies controlling what can enter caches. */
10+
package org.opensearch.cache.common.policy;

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

+53-1
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,31 @@
88

99
package org.opensearch.cache.common.tier;
1010

11+
import org.opensearch.cache.common.policy.TookTimePolicy;
1112
import org.opensearch.common.annotation.ExperimentalApi;
1213
import org.opensearch.common.cache.CacheType;
1314
import org.opensearch.common.cache.ICache;
1415
import org.opensearch.common.cache.LoadAwareCacheLoader;
1516
import org.opensearch.common.cache.RemovalListener;
1617
import org.opensearch.common.cache.RemovalNotification;
18+
import org.opensearch.common.cache.policy.CachedQueryResult;
1719
import org.opensearch.common.cache.store.config.CacheConfig;
1820
import org.opensearch.common.settings.Setting;
1921
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.common.unit.TimeValue;
2023
import org.opensearch.common.util.concurrent.ReleasableLock;
2124
import org.opensearch.common.util.iterable.Iterables;
2225

2326
import java.io.IOException;
27+
import java.util.ArrayList;
2428
import java.util.Arrays;
2529
import java.util.List;
2630
import java.util.Map;
2731
import java.util.Objects;
2832
import java.util.concurrent.locks.ReadWriteLock;
2933
import java.util.concurrent.locks.ReentrantReadWriteLock;
3034
import java.util.function.Function;
35+
import java.util.function.Predicate;
3136

3237
/**
3338
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
@@ -52,6 +57,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
5257
* Maintains caching tiers in ascending order of cache latency.
5358
*/
5459
private final List<ICache<K, V>> cacheList;
60+
private final List<Predicate<V>> policies;
5561

5662
TieredSpilloverCache(Builder<K, V> builder) {
5763
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
@@ -63,21 +69,27 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
6369
@Override
6470
public void onRemoval(RemovalNotification<K, V> notification) {
6571
try (ReleasableLock ignore = writeLock.acquire()) {
66-
diskCache.put(notification.getKey(), notification.getValue());
72+
if (evaluatePolicies(notification.getValue())) {
73+
diskCache.put(notification.getKey(), notification.getValue());
74+
}
6775
}
6876
}
6977
})
7078
.setKeyType(builder.cacheConfig.getKeyType())
7179
.setValueType(builder.cacheConfig.getValueType())
7280
.setSettings(builder.cacheConfig.getSettings())
7381
.setWeigher(builder.cacheConfig.getWeigher())
82+
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix
83+
// coming soon
7484
.build(),
7585
builder.cacheType,
7686
builder.cacheFactories
7787

7888
);
7989
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
8090
this.cacheList = Arrays.asList(onHeapCache, diskCache);
91+
92+
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
8193
}
8294

8395
// Package private for testing
@@ -192,6 +204,15 @@ private Function<K, V> getValueFromTieredCache() {
192204
};
193205
}
194206

207+
boolean evaluatePolicies(V value) {
208+
for (Predicate<V> policy : policies) {
209+
if (!policy.test(value)) {
210+
return false;
211+
}
212+
}
213+
return true;
214+
}
215+
195216
/**
196217
* Factory to create TieredSpilloverCache objects.
197218
*/
@@ -231,11 +252,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
231252
);
232253
}
233254
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
255+
256+
TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
257+
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
258+
.get(settings);
259+
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
260+
config.getCachedResultParser(),
261+
"Cached result parser fn can't be null"
262+
);
263+
234264
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
235265
.setOnHeapCacheFactory(onHeapCacheFactory)
236266
.setRemovalListener(config.getRemovalListener())
237267
.setCacheConfig(config)
238268
.setCacheType(cacheType)
269+
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
239270
.build();
240271
}
241272

@@ -257,6 +288,7 @@ public static class Builder<K, V> {
257288
private CacheConfig<K, V> cacheConfig;
258289
private CacheType cacheType;
259290
private Map<String, ICache.Factory> cacheFactories;
291+
private final ArrayList<Predicate<V>> policies = new ArrayList<>();
260292

261293
/**
262294
* Default constructor
@@ -323,6 +355,26 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
323355
return this;
324356
}
325357

358+
/**
359+
* Set a cache policy to be used to limit access to this cache's disk tier.
360+
* @param policy the policy
361+
* @return builder
362+
*/
363+
public Builder<K, V> addPolicy(Predicate<V> policy) {
364+
this.policies.add(policy);
365+
return this;
366+
}
367+
368+
/**
369+
* Set multiple policies to be used to limit access to this cache's disk tier.
370+
* @param policies the policies
371+
* @return builder
372+
*/
373+
public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
374+
this.policies.addAll(policies);
375+
return this;
376+
}
377+
326378
/**
327379
* Build tiered spillover cache.
328380
* @return TieredSpilloverCache

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java

+5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ public List<Setting<?>> getSettings() {
5151
settingList.add(
5252
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
5353
);
54+
settingList.add(
55+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
56+
cacheType.getSettingPrefix()
57+
)
58+
);
5459
}
5560
return settingList;
5661
}

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java

+18
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
package org.opensearch.cache.common.tier;
1010

1111
import org.opensearch.common.settings.Setting;
12+
import org.opensearch.common.unit.TimeValue;
13+
14+
import java.util.concurrent.TimeUnit;
1215

1316
import static org.opensearch.common.settings.Setting.Property.NodeScope;
1417

@@ -36,6 +39,21 @@ public class TieredSpilloverCacheSettings {
3639
(key) -> Setting.simpleString(key, "", NodeScope)
3740
);
3841

42+
/**
43+
* Setting defining the minimum took time for a query to be allowed into the disk cache.
44+
*/
45+
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
46+
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
47+
(key) -> Setting.timeSetting(
48+
key,
49+
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
50+
TimeValue.ZERO, // Minimum value for this setting
51+
NodeScope
52+
)
53+
);
54+
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
55+
// Will be tuned further with future benchmarks.
56+
3957
/**
4058
* Default constructor
4159
*/

0 commit comments

Comments
 (0)