12
12
import io .grpc .ManagedChannelBuilder ;
13
13
import io .opentelemetry .exporter .internal .compression .Compressor ;
14
14
import io .opentelemetry .exporter .internal .grpc .GrpcSender ;
15
+ import io .opentelemetry .exporter .internal .grpc .GrpcSenderConfig ;
15
16
import io .opentelemetry .exporter .internal .grpc .GrpcSenderProvider ;
16
17
import io .opentelemetry .exporter .internal .grpc .MarshalerServiceStub ;
17
18
import io .opentelemetry .exporter .internal .marshal .Marshaler ;
18
- import io .opentelemetry .sdk .common .export .RetryPolicy ;
19
19
import java .io .IOException ;
20
20
import java .io .OutputStream ;
21
21
import java .net .URI ;
22
22
import java .util .List ;
23
23
import java .util .Map ;
24
- import java .util .function .BiFunction ;
25
- import java .util .function .Supplier ;
26
- import javax .annotation .Nullable ;
27
- import javax .net .ssl .SSLContext ;
28
- import javax .net .ssl .X509TrustManager ;
29
24
30
25
/**
31
26
* {@link GrpcSender} SPI implementation for {@link UpstreamGrpcSender}.
36
31
public class UpstreamGrpcSenderProvider implements GrpcSenderProvider {
37
32
38
33
@ Override
39
- public <T extends Marshaler > GrpcSender <T > createSender (
40
- URI endpoint ,
41
- String endpointPath ,
42
- @ Nullable Compressor compressor ,
43
- long timeoutNanos ,
44
- long connectTimeoutNanos ,
45
- Supplier <Map <String , List <String >>> headersSupplier ,
46
- @ Nullable Object managedChannel ,
47
- Supplier <BiFunction <Channel , String , MarshalerServiceStub <T , ?, ?>>> stubFactory ,
48
- @ Nullable RetryPolicy retryPolicy ,
49
- @ Nullable SSLContext sslContext ,
50
- @ Nullable X509TrustManager trustManager ) {
34
+ public <T extends Marshaler > GrpcSender <T > createSender (GrpcSenderConfig <T > grpcSenderConfig ) {
51
35
boolean shutdownChannel = false ;
36
+ Object managedChannel = grpcSenderConfig .getManagedChannel ();
52
37
if (managedChannel == null ) {
53
38
// Shutdown the channel as part of the exporter shutdown sequence if
54
39
shutdownChannel = true ;
55
- managedChannel = minimalFallbackManagedChannel (endpoint );
40
+ managedChannel = minimalFallbackManagedChannel (grpcSenderConfig . getEndpoint () );
56
41
}
57
42
58
43
String authorityOverride = null ;
59
- Map <String , List <String >> headers = headersSupplier .get ();
44
+ Map <String , List <String >> headers = grpcSenderConfig . getHeadersSupplier () .get ();
60
45
if (headers != null ) {
61
46
for (Map .Entry <String , List <String >> entry : headers .entrySet ()) {
62
47
if (entry .getKey ().equals ("host" ) && !entry .getValue ().isEmpty ()) {
@@ -66,6 +51,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
66
51
}
67
52
68
53
String compression = Codec .Identity .NONE .getMessageEncoding ();
54
+ Compressor compressor = grpcSenderConfig .getCompressor ();
69
55
if (compressor != null ) {
70
56
CompressorRegistry .getDefaultInstance ()
71
57
.register (
@@ -84,12 +70,17 @@ public OutputStream compress(OutputStream os) throws IOException {
84
70
}
85
71
86
72
MarshalerServiceStub <T , ?, ?> stub =
87
- stubFactory
73
+ grpcSenderConfig
74
+ .getStubFactory ()
88
75
.get ()
89
76
.apply ((Channel ) managedChannel , authorityOverride )
90
77
.withCompression (compression );
91
78
92
- return new UpstreamGrpcSender <>(stub , shutdownChannel , timeoutNanos , headersSupplier );
79
+ return new UpstreamGrpcSender <>(
80
+ stub ,
81
+ shutdownChannel ,
82
+ grpcSenderConfig .getTimeoutNanos (),
83
+ grpcSenderConfig .getHeadersSupplier ());
93
84
}
94
85
95
86
/**
0 commit comments