1
+ use std:: borrow:: BorrowMut ;
2
+ use std:: net:: ToSocketAddrs ;
3
+ use std:: sync:: Arc ;
4
+ use std:: { env, net} ;
5
+
6
+ use opentelemetry:: trace:: TraceError ;
7
+ use opentelemetry_sdk:: trace:: { BatchSpanProcessor , Tracer } ;
8
+ use opentelemetry_sdk:: {
9
+ self ,
10
+ trace:: { BatchConfig , Config , TracerProvider } ,
11
+ } ;
12
+
1
13
use crate :: exporter:: agent:: { AgentAsyncClientUdp , AgentSyncClientUdp } ;
2
14
use crate :: exporter:: config:: {
3
15
build_config_and_process, install_tracer_provider_and_get_tracer, HasRequiredConfig ,
4
16
TransformationConfig ,
5
17
} ;
6
18
use crate :: exporter:: uploader:: { AsyncUploader , SyncUploader , Uploader } ;
7
19
use crate :: { Error , Exporter , JaegerTraceRuntime } ;
8
- use opentelemetry:: trace:: TraceError ;
9
- use opentelemetry_sdk:: trace:: { BatchSpanProcessor , Tracer } ;
10
- use opentelemetry_sdk:: {
11
- self ,
12
- trace:: { BatchConfig , Config , TracerProvider } ,
13
- } ;
14
- use std:: borrow:: BorrowMut ;
15
- use std:: sync:: Arc ;
16
- use std:: { env, net} ;
17
20
18
21
/// The max size of UDP packet we want to send, synced with jaeger-agent
19
22
const UDP_PACKET_MAX_LENGTH : usize = 65_000 ;
@@ -78,38 +81,23 @@ pub struct AgentPipeline {
78
81
transformation_config : TransformationConfig ,
79
82
trace_config : Option < Config > ,
80
83
batch_config : Option < BatchConfig > ,
81
- agent_endpoint : Result < Vec < net :: SocketAddr > , crate :: Error > ,
84
+ agent_endpoint : Option < String > ,
82
85
max_packet_size : usize ,
83
86
auto_split_batch : bool ,
84
87
}
85
88
86
89
impl Default for AgentPipeline {
87
90
fn default ( ) -> Self {
88
- let mut pipeline = AgentPipeline {
91
+ AgentPipeline {
89
92
transformation_config : Default :: default ( ) ,
90
93
trace_config : Default :: default ( ) ,
91
94
batch_config : Some ( Default :: default ( ) ) ,
92
- agent_endpoint : Ok ( vec ! [ format!(
95
+ agent_endpoint : Some ( format ! (
93
96
"{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
94
- )
95
- . parse( )
96
- . unwrap( ) ] ) ,
97
+ ) ) ,
97
98
max_packet_size : UDP_PACKET_MAX_LENGTH ,
98
99
auto_split_batch : false ,
99
- } ;
100
-
101
- let endpoint = match ( env:: var ( ENV_AGENT_HOST ) , env:: var ( ENV_AGENT_PORT ) ) {
102
- ( Ok ( host) , Ok ( port) ) => Some ( format ! ( "{}:{}" , host. trim( ) , port. trim( ) ) ) ,
103
- ( Ok ( host) , _) => Some ( format ! ( "{}:{DEFAULT_AGENT_ENDPOINT_PORT}" , host. trim( ) ) ) ,
104
- ( _, Ok ( port) ) => Some ( format ! ( "{DEFAULT_AGENT_ENDPOINT_HOST}:{}" , port. trim( ) ) ) ,
105
- ( _, _) => None ,
106
- } ;
107
-
108
- if let Some ( endpoint) = endpoint {
109
- pipeline = pipeline. with_endpoint ( endpoint) ;
110
100
}
111
-
112
- pipeline
113
101
}
114
102
}
115
103
@@ -147,16 +135,9 @@ impl AgentPipeline {
147
135
/// Any valid socket address can be used.
148
136
///
149
137
/// Default to be `127.0.0.1:6831`.
150
- pub fn with_endpoint < T : net :: ToSocketAddrs > ( self , agent_endpoint : T ) -> Self {
138
+ pub fn with_endpoint < T : Into < String > > ( self , agent_endpoint : T ) -> Self {
151
139
AgentPipeline {
152
- agent_endpoint : agent_endpoint
153
- . to_socket_addrs ( )
154
- . map ( |addrs| addrs. collect ( ) )
155
- . map_err ( |io_err| crate :: Error :: ConfigError {
156
- pipeline_name : "agent" ,
157
- config_name : "endpoint" ,
158
- reason : io_err. to_string ( ) ,
159
- } ) ,
140
+ agent_endpoint : Some ( agent_endpoint. into ( ) ) ,
160
141
..self
161
142
}
162
143
}
@@ -391,10 +372,10 @@ impl AgentPipeline {
391
372
R : JaegerTraceRuntime ,
392
373
{
393
374
let agent = AgentAsyncClientUdp :: new (
394
- self . agent_endpoint ?. as_slice ( ) ,
395
375
self . max_packet_size ,
396
376
runtime,
397
377
self . auto_split_batch ,
378
+ self . resolve_endpoint ( ) ?,
398
379
)
399
380
. map_err :: < Error , _ > ( Into :: into) ?;
400
381
Ok ( Arc :: new ( AsyncUploader :: Agent (
@@ -404,13 +385,38 @@ impl AgentPipeline {
404
385
405
386
fn build_sync_agent_uploader ( self ) -> Result < Arc < dyn Uploader > , TraceError > {
406
387
let agent = AgentSyncClientUdp :: new (
407
- self . agent_endpoint ?. as_slice ( ) ,
408
388
self . max_packet_size ,
409
389
self . auto_split_batch ,
390
+ self . resolve_endpoint ( ) ?,
410
391
)
411
392
. map_err :: < Error , _ > ( Into :: into) ?;
412
393
Ok ( Arc :: new ( SyncUploader :: Agent ( std:: sync:: Mutex :: new ( agent) ) ) )
413
394
}
395
+
396
+ // resolve the agent endpoint from the environment variables or the builder
397
+ // if only one of the environment variables is set, the other one will be set to the default value
398
+ // if no environment variable is set, the builder value will be used.
399
+ fn resolve_endpoint ( self ) -> Result < Vec < net:: SocketAddr > , TraceError > {
400
+ let endpoint_str = match ( env:: var ( ENV_AGENT_HOST ) , env:: var ( ENV_AGENT_PORT ) ) {
401
+ ( Ok ( host) , Ok ( port) ) => format ! ( "{}:{}" , host. trim( ) , port. trim( ) ) ,
402
+ ( Ok ( host) , _) => format ! ( "{}:{DEFAULT_AGENT_ENDPOINT_PORT}" , host. trim( ) ) ,
403
+ ( _, Ok ( port) ) => format ! ( "{DEFAULT_AGENT_ENDPOINT_HOST}:{}" , port. trim( ) ) ,
404
+ ( _, _) => self . agent_endpoint . unwrap_or ( format ! (
405
+ "{DEFAULT_AGENT_ENDPOINT_HOST}:{DEFAULT_AGENT_ENDPOINT_PORT}"
406
+ ) ) ,
407
+ } ;
408
+ endpoint_str
409
+ . to_socket_addrs ( )
410
+ . map ( |addrs| addrs. collect ( ) )
411
+ . map_err ( |io_err| {
412
+ Error :: ConfigError {
413
+ pipeline_name : "agent" ,
414
+ config_name : "endpoint" ,
415
+ reason : io_err. to_string ( ) ,
416
+ }
417
+ . into ( )
418
+ } )
419
+ }
414
420
}
415
421
416
422
#[ cfg( test) ]
@@ -429,9 +435,12 @@ mod tests {
429
435
( "127.0.0.1:1001" , true ) ,
430
436
] ;
431
437
for ( socket_str, is_ok) in test_cases. into_iter ( ) {
432
- let pipeline = AgentPipeline :: default ( ) . with_endpoint ( socket_str) ;
438
+ let resolved_endpoint = AgentPipeline :: default ( )
439
+ . with_endpoint ( socket_str)
440
+ . resolve_endpoint ( ) ;
433
441
assert_eq ! (
434
- pipeline. agent_endpoint. is_ok( ) ,
442
+ resolved_endpoint. is_ok( ) ,
443
+ // if is_ok is true, use socket_str, otherwise use the default endpoint
435
444
is_ok,
436
445
"endpoint string {}" ,
437
446
socket_str
0 commit comments