Skip to content

Commit cf20a61

Browse files
opensearch-trigger-bot[bot]github-actions[bot]Gagan Juneja
authored
Adds support to inject telemetry instances to plugins (opensearch-project#13636) (opensearch-project#13736)
* Adds support to inject telemetry instances to plugins * Adds test * incorporate pr comments --------- (cherry picked from commit 6ba6f59) Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Gagan Juneja <gjjuneja@amazon.com>
1 parent e85f02e commit cf20a61

File tree

4 files changed

+213
-0
lines changed

4 files changed

+213
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2020
### Changed
2121
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
2222
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
23+
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
2324

2425
### Deprecated
2526

server/src/main/java/org/opensearch/node/Node.java

+38
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@
206206
import org.opensearch.plugins.SearchPlugin;
207207
import org.opensearch.plugins.SecureSettingsFactory;
208208
import org.opensearch.plugins.SystemIndexPlugin;
209+
import org.opensearch.plugins.TelemetryAwarePlugin;
209210
import org.opensearch.plugins.TelemetryPlugin;
210211
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
211212
import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor;
@@ -273,6 +274,7 @@
273274
import java.util.HashSet;
274275
import java.util.LinkedHashSet;
275276
import java.util.List;
277+
import java.util.Locale;
276278
import java.util.Map;
277279
import java.util.Optional;
278280
import java.util.Set;
@@ -619,6 +621,18 @@ protected Node(
619621
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
620622
if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) {
621623
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
624+
List<TelemetryPlugin> telemetryPluginsImplementingTelemetryAware = telemetryPlugins.stream()
625+
.filter(a -> TelemetryAwarePlugin.class.isAssignableFrom(a.getClass()))
626+
.collect(toList());
627+
if (telemetryPluginsImplementingTelemetryAware.isEmpty() == false) {
628+
throw new IllegalStateException(
629+
String.format(
630+
Locale.ROOT,
631+
"Telemetry plugins %s should not implement TelemetryAwarePlugin interface",
632+
telemetryPluginsImplementingTelemetryAware
633+
)
634+
);
635+
}
622636
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
623637
if (telemetrySettings.isTracingFeatureEnabled()) {
624638
tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext());
@@ -906,6 +920,30 @@ protected Node(
906920
)
907921
.collect(Collectors.toList());
908922

923+
Collection<Object> telemetryAwarePluginComponents = pluginsService.filterPlugins(TelemetryAwarePlugin.class)
924+
.stream()
925+
.flatMap(
926+
p -> p.createComponents(
927+
client,
928+
clusterService,
929+
threadPool,
930+
resourceWatcherService,
931+
scriptService,
932+
xContentRegistry,
933+
environment,
934+
nodeEnvironment,
935+
namedWriteableRegistry,
936+
clusterModule.getIndexNameExpressionResolver(),
937+
repositoriesServiceReference::get,
938+
tracer,
939+
metricsRegistry
940+
).stream()
941+
)
942+
.collect(Collectors.toList());
943+
944+
// Add the telemetryAwarePlugin components to the existing pluginComponents collection.
945+
pluginComponents.addAll(telemetryAwarePluginComponents);
946+
909947
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
910948
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
911949
new SearchRequestOperationsCompositeListenerFactory(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugins;
10+
11+
import org.opensearch.client.Client;
12+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
13+
import org.opensearch.cluster.service.ClusterService;
14+
import org.opensearch.common.annotation.ExperimentalApi;
15+
import org.opensearch.common.lifecycle.LifecycleComponent;
16+
import org.opensearch.core.common.io.stream.NamedWriteable;
17+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
18+
import org.opensearch.core.xcontent.NamedXContentRegistry;
19+
import org.opensearch.env.Environment;
20+
import org.opensearch.env.NodeEnvironment;
21+
import org.opensearch.repositories.RepositoriesService;
22+
import org.opensearch.script.ScriptService;
23+
import org.opensearch.telemetry.metrics.MetricsRegistry;
24+
import org.opensearch.telemetry.tracing.Tracer;
25+
import org.opensearch.threadpool.ThreadPool;
26+
import org.opensearch.watcher.ResourceWatcherService;
27+
28+
import java.util.Collection;
29+
import java.util.Collections;
30+
import java.util.function.Supplier;
31+
32+
/**
33+
* Plugin that provides the telemetry registries to build component with telemetry and also provide a way to
34+
* pass telemetry registries to the implementing plugins for adding instrumentation in the code.
35+
*
36+
* @opensearch.experimental
37+
*/
38+
@ExperimentalApi
39+
public interface TelemetryAwarePlugin {
40+
41+
/**
42+
* Returns components added by this plugin.
43+
* <p>
44+
* Any components returned that implement {@link LifecycleComponent} will have their lifecycle managed.
45+
* Note: To aid in the migration away from guice, all objects returned as components will be bound in guice
46+
* to themselves.
47+
*
48+
* @param client A client to make requests to the system
49+
* @param clusterService A service to allow watching and updating cluster state
50+
* @param threadPool A service to allow retrieving an executor to run an async action
51+
* @param resourceWatcherService A service to watch for changes to node local files
52+
* @param scriptService A service to allow running scripts on the local node
53+
* @param xContentRegistry the registry for extensible xContent parsing
54+
* @param environment the environment for path and setting configurations
55+
* @param nodeEnvironment the node environment used coordinate access to the data paths
56+
* @param namedWriteableRegistry the registry for {@link NamedWriteable} object parsing
57+
* @param indexNameExpressionResolver A service that resolves expression to index and alias names
58+
* @param repositoriesServiceSupplier A supplier for the service that manages snapshot repositories; will return null when this method
59+
* is called, but will return the repositories service once the node is initialized.
60+
* @param tracer the tracer to add tracing instrumentation.
61+
* @param metricsRegistry the registry for metrics instrumentation.
62+
*/
63+
default Collection<Object> createComponents(
64+
Client client,
65+
ClusterService clusterService,
66+
ThreadPool threadPool,
67+
ResourceWatcherService resourceWatcherService,
68+
ScriptService scriptService,
69+
NamedXContentRegistry xContentRegistry,
70+
Environment environment,
71+
NodeEnvironment nodeEnvironment,
72+
NamedWriteableRegistry namedWriteableRegistry,
73+
IndexNameExpressionResolver indexNameExpressionResolver,
74+
Supplier<RepositoriesService> repositoriesServiceSupplier,
75+
Tracer tracer,
76+
MetricsRegistry metricsRegistry
77+
) {
78+
return Collections.emptyList();
79+
}
80+
}

server/src/test/java/org/opensearch/node/NodeTests.java

+94
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,23 @@
3434
import org.apache.lucene.tests.util.LuceneTestCase;
3535
import org.opensearch.bootstrap.BootstrapCheck;
3636
import org.opensearch.bootstrap.BootstrapContext;
37+
import org.opensearch.client.Client;
3738
import org.opensearch.cluster.ClusterName;
39+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
3840
import org.opensearch.cluster.node.DiscoveryNodeRole;
41+
import org.opensearch.cluster.service.ClusterService;
3942
import org.opensearch.common.SetOnce;
4043
import org.opensearch.common.network.NetworkModule;
4144
import org.opensearch.common.settings.Settings;
4245
import org.opensearch.common.settings.SettingsException;
46+
import org.opensearch.common.util.FeatureFlags;
4347
import org.opensearch.core.common.breaker.CircuitBreaker;
48+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
4449
import org.opensearch.core.common.transport.BoundTransportAddress;
4550
import org.opensearch.core.common.unit.ByteSizeUnit;
4651
import org.opensearch.core.common.unit.ByteSizeValue;
4752
import org.opensearch.core.indices.breaker.CircuitBreakerService;
53+
import org.opensearch.core.xcontent.NamedXContentRegistry;
4854
import org.opensearch.env.Environment;
4955
import org.opensearch.env.NodeEnvironment;
5056
import org.opensearch.index.IndexService;
@@ -56,22 +62,35 @@
5662
import org.opensearch.monitor.fs.FsProbe;
5763
import org.opensearch.plugins.CircuitBreakerPlugin;
5864
import org.opensearch.plugins.Plugin;
65+
import org.opensearch.plugins.TelemetryAwarePlugin;
66+
import org.opensearch.plugins.TelemetryPlugin;
67+
import org.opensearch.repositories.RepositoriesService;
68+
import org.opensearch.script.ScriptService;
69+
import org.opensearch.telemetry.Telemetry;
70+
import org.opensearch.telemetry.TelemetrySettings;
71+
import org.opensearch.telemetry.metrics.MetricsRegistry;
72+
import org.opensearch.telemetry.tracing.Tracer;
73+
import org.opensearch.test.FeatureFlagSetter;
5974
import org.opensearch.test.InternalTestCluster;
6075
import org.opensearch.test.MockHttpTransport;
6176
import org.opensearch.test.NodeRoles;
6277
import org.opensearch.test.OpenSearchTestCase;
6378
import org.opensearch.threadpool.ThreadPool;
79+
import org.opensearch.watcher.ResourceWatcherService;
6480

6581
import java.io.IOException;
6682
import java.nio.file.Path;
6783
import java.util.ArrayList;
84+
import java.util.Collection;
6885
import java.util.Collections;
6986
import java.util.List;
87+
import java.util.Optional;
7088
import java.util.Set;
7189
import java.util.concurrent.CountDownLatch;
7290
import java.util.concurrent.RejectedExecutionException;
7391
import java.util.concurrent.TimeUnit;
7492
import java.util.concurrent.atomic.AtomicBoolean;
93+
import java.util.function.Supplier;
7594

7695
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
7796
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
@@ -404,6 +423,81 @@ public void testCreateWithFileCache() throws Exception {
404423
}
405424
}
406425

426+
public void testTelemetryAwarePlugins() throws IOException {
427+
Settings.Builder settings = baseSettings();
428+
List<Class<? extends Plugin>> plugins = basePlugins();
429+
plugins.add(MockTelemetryAwarePlugin.class);
430+
try (Node node = new MockNode(settings.build(), plugins)) {
431+
MockTelemetryAwareComponent mockTelemetryAwareComponent = node.injector().getInstance(MockTelemetryAwareComponent.class);
432+
assertNotNull(mockTelemetryAwareComponent.getTracer());
433+
assertNotNull(mockTelemetryAwareComponent.getMetricsRegistry());
434+
TelemetryAwarePlugin telemetryAwarePlugin = node.getPluginsService().filterPlugins(TelemetryAwarePlugin.class).get(0);
435+
assertTrue(telemetryAwarePlugin instanceof MockTelemetryAwarePlugin);
436+
}
437+
}
438+
439+
public void testTelemetryPluginShouldNOTImplementTelemetryAwarePlugin() throws IOException {
440+
Settings.Builder settings = baseSettings();
441+
List<Class<? extends Plugin>> plugins = basePlugins();
442+
plugins.add(MockTelemetryPlugin.class);
443+
FeatureFlagSetter.set(FeatureFlags.TELEMETRY);
444+
settings.put(TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING.getKey(), true);
445+
assertThrows(IllegalStateException.class, () -> new MockNode(settings.build(), plugins));
446+
}
447+
448+
private static class MockTelemetryAwareComponent {
449+
private final Tracer tracer;
450+
private final MetricsRegistry metricsRegistry;
451+
452+
public MockTelemetryAwareComponent(Tracer tracer, MetricsRegistry metricsRegistry) {
453+
this.tracer = tracer;
454+
this.metricsRegistry = metricsRegistry;
455+
}
456+
457+
public Tracer getTracer() {
458+
return tracer;
459+
}
460+
461+
public MetricsRegistry getMetricsRegistry() {
462+
return metricsRegistry;
463+
}
464+
}
465+
466+
public static class MockTelemetryAwarePlugin extends Plugin implements TelemetryAwarePlugin {
467+
@Override
468+
public Collection<Object> createComponents(
469+
Client client,
470+
ClusterService clusterService,
471+
ThreadPool threadPool,
472+
ResourceWatcherService resourceWatcherService,
473+
ScriptService scriptService,
474+
NamedXContentRegistry xContentRegistry,
475+
Environment environment,
476+
NodeEnvironment nodeEnvironment,
477+
NamedWriteableRegistry namedWriteableRegistry,
478+
IndexNameExpressionResolver indexNameExpressionResolver,
479+
Supplier<RepositoriesService> repositoriesServiceSupplier,
480+
Tracer tracer,
481+
MetricsRegistry metricsRegistry
482+
) {
483+
return List.of(new MockTelemetryAwareComponent(tracer, metricsRegistry));
484+
}
485+
486+
}
487+
488+
public static class MockTelemetryPlugin extends Plugin implements TelemetryPlugin, TelemetryAwarePlugin {
489+
490+
@Override
491+
public Optional<Telemetry> getTelemetry(TelemetrySettings telemetrySettings) {
492+
return Optional.empty();
493+
}
494+
495+
@Override
496+
public String getName() {
497+
return null;
498+
}
499+
}
500+
407501
public static class MockCircuitBreakerPlugin extends Plugin implements CircuitBreakerPlugin {
408502

409503
private SetOnce<CircuitBreaker> myCircuitBreaker = new SetOnce<>();

0 commit comments

Comments
 (0)