Skip to content

Commit 57d8037

Browse files
GaganjunejaGagan Juneja
authored andcommitted
Add support to use trace propagated from client (opensearch-project#9506)
* Add support to use trace propagated from client Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Add support to use trace propagated from client Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Refactor code Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Add support to use trace propagated from client Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Add support to use trace propagated from client Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Refactor code Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Refactor code Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Merged CHANGELOG Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Empty-Commit Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Empty-Commit Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Empty-Commit Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Empty-Commit Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Empty-Commit Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> * Empty-Commit Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> --------- Signed-off-by: Gagan Juneja <gjjuneja@amazon.com> Signed-off-by: Gagan Juneja <gagandeepjuneja@gmail.com> Co-authored-by: Gagan Juneja <gjjuneja@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent 720bbd9 commit 57d8037

File tree

16 files changed

+212
-17
lines changed

16 files changed

+212
-17
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
156156
- Improve performance of encoding composite keys in multi-term aggregations ([#9412](https://github.com/opensearch-project/OpenSearch/pull/9412))
157157
- Fix sort related ITs for concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9466)
158158
- Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https://github.com/opensearch-project/OpenSearch/pull/9528)))
159+
- Add support to use trace propagated from client ([#9506](https://github.com/opensearch-project/OpenSearch/pull/9506))
159160
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469))
160161
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
161162
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
import java.io.Closeable;
1414
import java.io.IOException;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.Optional;
1518

1619
/**
1720
*
@@ -44,7 +47,7 @@ public SpanScope startSpan(String spanName) {
4447

4548
@Override
4649
public SpanScope startSpan(String spanName, Attributes attributes) {
47-
return startSpan(spanName, null, attributes);
50+
return startSpan(spanName, (SpanContext) null, attributes);
4851
}
4952

5053
@Override
@@ -97,4 +100,10 @@ protected void addDefaultAttributes(Span span) {
97100
span.addAttribute(THREAD_NAME, Thread.currentThread().getName());
98101
}
99102

103+
@Override
104+
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
105+
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
106+
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
107+
}
108+
100109
}

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.telemetry.tracing;
1010

1111
import org.opensearch.telemetry.tracing.attributes.Attributes;
12+
import org.opensearch.telemetry.tracing.http.HttpTracer;
1213

1314
import java.io.Closeable;
1415

@@ -18,7 +19,7 @@
1819
*
1920
* All methods on the Tracer object are multi-thread safe.
2021
*/
21-
public interface Tracer extends Closeable {
22+
public interface Tracer extends HttpTracer, Closeable {
2223

2324
/**
2425
* Starts the {@link Span} with given name

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
package org.opensearch.telemetry.tracing;
1010

11+
import java.util.List;
1112
import java.util.Map;
13+
import java.util.Optional;
1214
import java.util.function.BiConsumer;
1315

1416
/**
@@ -23,7 +25,15 @@ public interface TracingContextPropagator {
2325
* @param props properties
2426
* @return current span
2527
*/
26-
Span extract(Map<String, String> props);
28+
Optional<Span> extract(Map<String, String> props);
29+
30+
/**
31+
* Extracts current span from HTTP headers.
32+
*
33+
* @param headers request headers to extract the context from
34+
* @return current span
35+
*/
36+
Optional<Span> extractFromHeaders(Map<String, List<String>> headers);
2737

2838
/**
2939
* Injects tracing context
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.telemetry.tracing.http;
10+
11+
import org.opensearch.telemetry.tracing.Span;
12+
import org.opensearch.telemetry.tracing.SpanScope;
13+
import org.opensearch.telemetry.tracing.attributes.Attributes;
14+
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
/**
19+
* HttpTracer helps in creating a {@link Span} which reads the incoming tracing information
20+
* from the HttpRequest header and propagate the span accordingly.
21+
*
22+
* All methods on the Tracer object are multi-thread safe.
23+
*/
24+
public interface HttpTracer {
25+
/**
26+
* Start the span with propagating the tracing info from the HttpRequest header.
27+
*
28+
* @param spanName span name.
29+
* @param header http request header.
30+
* @param attributes span attributes.
31+
* @return scope of the span, must be closed with explicit close or with try-with-resource
32+
*/
33+
SpanScope startSpan(String spanName, Map<String, List<String>> header, Attributes attributes);
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Contains No-op implementations
11+
*/
12+
package org.opensearch.telemetry.tracing.http;

libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java

+8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import org.opensearch.telemetry.tracing.Tracer;
1414
import org.opensearch.telemetry.tracing.attributes.Attributes;
1515

16+
import java.util.List;
17+
import java.util.Map;
18+
1619
/**
1720
* No-op implementation of Tracer
1821
*
@@ -51,4 +54,9 @@ public SpanContext getCurrentSpan() {
5154
public void close() {
5255

5356
}
57+
58+
@Override
59+
public SpanScope startSpan(String spanName, Map<String, List<String>> header, Attributes attributes) {
60+
return SpanScope.NO_OP;
61+
}
5462
}

libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
import org.junit.Assert;
1818

1919
import java.io.IOException;
20+
import java.util.Arrays;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
2024

2125
import static org.mockito.ArgumentMatchers.any;
2226
import static org.mockito.ArgumentMatchers.eq;
@@ -104,14 +108,36 @@ public void testCreateSpanWithParent() {
104108
Assert.assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan());
105109
}
106110

111+
public void testHttpTracer() {
112+
String traceId = "trace_id";
113+
String spanId = "span_id";
114+
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
115+
116+
DefaultTracer defaultTracer = new DefaultTracer(
117+
tracingTelemetry,
118+
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
119+
);
120+
121+
Map<String, List<String>> requestHeaders = new HashMap<>();
122+
requestHeaders.put("traceparent", Arrays.asList(traceId + "~" + spanId));
123+
124+
SpanScope spanScope = defaultTracer.startSpan("test_span", requestHeaders, Attributes.EMPTY);
125+
SpanContext currentSpan = defaultTracer.getCurrentSpan();
126+
assertNotNull(currentSpan);
127+
assertEquals(traceId, currentSpan.getSpan().getTraceId());
128+
assertEquals(traceId, currentSpan.getSpan().getParentSpan().getTraceId());
129+
assertEquals(spanId, currentSpan.getSpan().getParentSpan().getSpanId());
130+
spanScope.close();
131+
}
132+
107133
public void testCreateSpanWithNullParent() {
108134
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
109135
DefaultTracer defaultTracer = new DefaultTracer(
110136
tracingTelemetry,
111137
new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry)
112138
);
113139

114-
defaultTracer.startSpan("span_name", null, Attributes.EMPTY);
140+
defaultTracer.startSpan("span_name");
115141

116142
Assert.assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName());
117143
Assert.assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan());

libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,16 @@ public void testRunnableWithParent() throws Exception {
5454
DefaultTracer defaultTracer = new DefaultTracer(new MockTracingTelemetry(), contextStorage);
5555
defaultTracer.startSpan(parentSpanName);
5656
SpanContext parentSpan = defaultTracer.getCurrentSpan();
57-
AtomicReference<SpanContext> currrntSpan = new AtomicReference<>(new SpanContext(null));
57+
AtomicReference<SpanContext> currentSpan = new AtomicReference<>();
5858
final AtomicBoolean isRunnableCompleted = new AtomicBoolean(false);
5959
TraceableRunnable traceableRunnable = new TraceableRunnable(defaultTracer, spanName, parentSpan, Attributes.EMPTY, () -> {
6060
isRunnableCompleted.set(true);
61-
currrntSpan.set(defaultTracer.getCurrentSpan());
61+
currentSpan.set(defaultTracer.getCurrentSpan());
6262
});
6363
traceableRunnable.run();
6464
assertTrue(isRunnableCompleted.get());
65-
assertEquals(spanName, currrntSpan.get().getSpan().getSpanName());
66-
assertEquals(parentSpan.getSpan(), currrntSpan.get().getSpan().getParentSpan());
65+
assertEquals(spanName, currentSpan.get().getSpan().getSpanName());
66+
assertEquals(parentSpan.getSpan(), currentSpan.get().getSpan().getParentSpan());
6767
assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan());
6868
}
6969
}

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99
package org.opensearch.telemetry.tracing;
1010

11+
import org.opensearch.core.common.Strings;
12+
13+
import java.util.Collections;
14+
import java.util.List;
1115
import java.util.Map;
16+
import java.util.Optional;
1217
import java.util.function.BiConsumer;
1318

1419
import io.opentelemetry.api.OpenTelemetry;
@@ -32,15 +37,25 @@ public OTelTracingContextPropagator(OpenTelemetry openTelemetry) {
3237
}
3338

3439
@Override
35-
public Span extract(Map<String, String> props) {
40+
public Optional<Span> extract(Map<String, String> props) {
3641
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), props, TEXT_MAP_GETTER);
42+
return Optional.ofNullable(getPropagatedSpan(context));
43+
}
44+
45+
private static OTelPropagatedSpan getPropagatedSpan(Context context) {
3746
if (context != null) {
3847
io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.fromContext(context);
3948
return new OTelPropagatedSpan(span);
4049
}
4150
return null;
4251
}
4352

53+
@Override
54+
public Optional<Span> extractFromHeaders(Map<String, List<String>> headers) {
55+
Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER);
56+
return Optional.ofNullable(getPropagatedSpan(context));
57+
}
58+
4459
@Override
4560
public void inject(Span currentSpan, BiConsumer<String, String> setter) {
4661
openTelemetry.getPropagators().getTextMapPropagator().inject(context((OTelSpan) currentSpan), setter, TEXT_MAP_SETTER);
@@ -72,4 +87,23 @@ public String get(Map<String, String> headers, String key) {
7287
}
7388
};
7489

90+
private static final TextMapGetter<Map<String, List<String>>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() {
91+
@Override
92+
public Iterable<String> keys(Map<String, List<String>> headers) {
93+
if (headers != null) {
94+
return headers.keySet();
95+
} else {
96+
return Collections.emptySet();
97+
}
98+
}
99+
100+
@Override
101+
public String get(Map<String, List<String>> headers, String key) {
102+
if (headers != null && headers.containsKey(key)) {
103+
return Strings.collectionToCommaDelimitedString(headers.get(key));
104+
}
105+
return null;
106+
}
107+
};
108+
75109
}

plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
import org.opensearch.test.OpenSearchTestCase;
1212

13+
import java.util.Arrays;
1314
import java.util.HashMap;
15+
import java.util.List;
1416
import java.util.Map;
1517

1618
import io.opentelemetry.api.OpenTelemetry;
@@ -19,6 +21,7 @@
1921
import io.opentelemetry.api.trace.TraceFlags;
2022
import io.opentelemetry.api.trace.TraceState;
2123
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
24+
import io.opentelemetry.context.Context;
2225
import io.opentelemetry.context.propagation.ContextPropagators;
2326

2427
import static org.mockito.Mockito.mock;
@@ -48,8 +51,39 @@ public void testExtractTracerContextFromHeader() {
4851
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
4952
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
5053
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
51-
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders);
54+
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders).orElse(null);
5255
assertEquals(TRACE_ID, span.getTraceId());
5356
assertEquals(SPAN_ID, span.getSpanId());
5457
}
58+
59+
public void testExtractTracerContextFromHttpHeader() {
60+
Map<String, List<String>> requestHeaders = new HashMap<>();
61+
requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00"));
62+
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
63+
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
64+
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
65+
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(requestHeaders).get();
66+
assertEquals(TRACE_ID, span.getTraceId());
67+
assertEquals(SPAN_ID, span.getSpanId());
68+
}
69+
70+
public void testExtractTracerContextFromHttpHeaderNull() {
71+
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
72+
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
73+
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
74+
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(null).get();
75+
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
76+
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
77+
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
78+
}
79+
80+
public void testExtractTracerContextFromHttpHeaderEmpty() {
81+
OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class);
82+
when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance()));
83+
TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry);
84+
org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(new HashMap<>()).get();
85+
org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root()));
86+
assertEquals(propagatedSpan.getTraceId(), span.getTraceId());
87+
assertEquals(propagatedSpan.getSpanId(), span.getSpanId());
88+
}
5589
}

server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ private Optional<Span> spanFromThreadContext(String key) {
9090
}
9191

9292
private Span spanFromHeader() {
93-
return tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders());
93+
Optional<Span> span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders());
94+
return span.orElse(null);
9495
}
9596
}

server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.opensearch.telemetry.tracing.noop.NoopTracer;
1414

1515
import java.io.IOException;
16+
import java.util.List;
17+
import java.util.Map;
1618

1719
/**
1820
* Wrapper implementation of Tracer. This delegates call to right tracer based on the tracer settings
@@ -42,7 +44,7 @@ public SpanScope startSpan(String spanName) {
4244

4345
@Override
4446
public SpanScope startSpan(String spanName, Attributes attributes) {
45-
return startSpan(spanName, null, attributes);
47+
return startSpan(spanName, (SpanContext) null, attributes);
4648
}
4749

4850
@Override
@@ -66,4 +68,9 @@ public void close() throws IOException {
6668
Tracer getDelegateTracer() {
6769
return telemetrySettings.isTracingEnabled() ? defaultTracer : NoopTracer.INSTANCE;
6870
}
71+
72+
@Override
73+
public SpanScope startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
74+
return defaultTracer.startSpan(spanName, headers, attributes);
75+
}
6976
}

server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracer() throws Excepti
5151
wrappedTracer.startSpan("foo");
5252

5353
assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer);
54-
verify(mockDefaultTracer).startSpan(eq("foo"), eq(null), any(Attributes.class));
54+
verify(mockDefaultTracer).startSpan(eq("foo"), eq((SpanContext) null), any(Attributes.class));
5555
}
5656
}
5757

@@ -64,7 +64,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracerWithAttr() throws
6464
wrappedTracer.startSpan("foo", attributes);
6565

6666
assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer);
67-
verify(mockDefaultTracer).startSpan("foo", null, attributes);
67+
verify(mockDefaultTracer).startSpan("foo", (SpanContext) null, attributes);
6868
}
6969
}
7070

0 commit comments

Comments
 (0)