@@ -23,7 +23,12 @@ use std::{
23
23
use async_trait:: async_trait;
24
24
use futures:: stream:: StreamExt ;
25
25
use opentelemetry:: {
26
- sdk:: export:: trace:: { ExportResult , SpanData , SpanExporter } ,
26
+ global:: handle_error,
27
+ sdk:: export:: {
28
+ trace:: { ExportResult , SpanData , SpanExporter } ,
29
+ ExportError ,
30
+ } ,
31
+ trace:: TraceError ,
27
32
Value ,
28
33
} ;
29
34
use thiserror:: Error ;
@@ -61,22 +66,6 @@ pub struct StackDriverExporter {
61
66
maximum_shutdown_duration : Duration ,
62
67
}
63
68
64
- impl fmt:: Debug for StackDriverExporter {
65
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
66
- #[ allow( clippy:: unneeded_field_pattern) ]
67
- let Self {
68
- maximum_shutdown_duration,
69
- pending_count,
70
- tx : _,
71
- } = self ;
72
- f. debug_struct ( "StackDriverExporter" )
73
- . field ( "tx" , & "(elided)" )
74
- . field ( "pending_count" , pending_count)
75
- . field ( "maximum_shutdown_duration" , maximum_shutdown_duration)
76
- . finish ( )
77
- }
78
- }
79
-
80
69
impl StackDriverExporter {
81
70
pub fn builder ( ) -> Builder {
82
71
Builder :: default ( )
@@ -91,10 +80,7 @@ impl StackDriverExporter {
91
80
impl SpanExporter for StackDriverExporter {
92
81
async fn export ( & mut self , batch : Vec < SpanData > ) -> ExportResult {
93
82
match self . tx . try_send ( batch) {
94
- Err ( e) => {
95
- log:: error!( "Unable to send to export_inner {:?}" , e) ;
96
- Err ( e. into ( ) )
97
- }
83
+ Err ( e) => Err ( e. into ( ) ) ,
98
84
Ok ( ( ) ) => {
99
85
self . pending_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
100
86
Ok ( ( ) )
@@ -112,6 +98,22 @@ impl SpanExporter for StackDriverExporter {
112
98
}
113
99
}
114
100
101
+ impl fmt:: Debug for StackDriverExporter {
102
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
103
+ #[ allow( clippy:: unneeded_field_pattern) ]
104
+ let Self {
105
+ tx : _,
106
+ pending_count,
107
+ maximum_shutdown_duration,
108
+ } = self ;
109
+ f. debug_struct ( "StackDriverExporter" )
110
+ . field ( "tx" , & "(elided)" )
111
+ . field ( "pending_count" , pending_count)
112
+ . field ( "maximum_shutdown_duration" , maximum_shutdown_duration)
113
+ . finish ( )
114
+ }
115
+ }
116
+
115
117
/// Helper type to build a `StackDriverExporter`.
116
118
#[ derive( Clone , Default ) ]
117
119
pub struct Builder {
@@ -141,10 +143,13 @@ impl Builder {
141
143
self
142
144
}
143
145
144
- pub async fn build (
146
+ pub async fn build < A : Authorizer > (
145
147
self ,
146
- authenticator : impl Authorizer ,
147
- ) -> Result < ( StackDriverExporter , impl Future < Output = ( ) > ) , Error > {
148
+ authenticator : A ,
149
+ ) -> Result < ( StackDriverExporter , impl Future < Output = ( ) > ) , Error >
150
+ where
151
+ Error : From < A :: Error > ,
152
+ {
148
153
let Self {
149
154
maximum_shutdown_duration,
150
155
num_concurrent_requests,
@@ -217,7 +222,10 @@ struct ExporterContext<'a, A> {
217
222
scopes : Arc < Vec < & ' static str > > ,
218
223
}
219
224
220
- impl < A : Authorizer > ExporterContext < ' _ , A > {
225
+ impl < A : Authorizer > ExporterContext < ' _ , A >
226
+ where
227
+ Error : From < A :: Error > ,
228
+ {
221
229
async fn export ( mut self , batch : Vec < SpanData > ) {
222
230
use proto:: devtools:: cloudtrace:: v2:: span:: time_event:: Value ;
223
231
@@ -327,9 +335,9 @@ impl<A: Authorizer> ExporterContext<'_, A> {
327
335
328
336
self . pending_count . fetch_sub ( 1 , Ordering :: Relaxed ) ;
329
337
if let Err ( e) = self . authorizer . authorize ( & mut req, & self . scopes ) . await {
330
- log :: error! ( "StackDriver authentication failed {}" , e ) ;
338
+ handle_error ( TraceError :: from ( Error :: from ( e ) ) ) ;
331
339
} else if let Err ( e) = self . trace_client . batch_write_spans ( req) . await {
332
- log :: error! ( "StackDriver push failed {}" , e ) ;
340
+ handle_error ( TraceError :: from ( Error :: TonicRpc ( e ) ) ) ;
333
341
}
334
342
335
343
let client = match & mut self . log_client {
@@ -351,9 +359,9 @@ impl<A: Authorizer> ExporterContext<'_, A> {
351
359
} ) ;
352
360
353
361
if let Err ( e) = self . authorizer . authorize ( & mut req, & self . scopes ) . await {
354
- log :: error! ( "StackDriver authentication failed {}" , e ) ;
362
+ handle_error ( TraceError :: from ( Error :: from ( e ) ) ) ;
355
363
} else if let Err ( e) = client. client . write_log_entries ( req) . await {
356
- log :: error! ( "StackDriver push failed {}" , e ) ;
364
+ handle_error ( TraceError :: from ( Error :: TonicRpc ( e ) ) ) ;
357
365
}
358
366
}
359
367
}
@@ -498,12 +506,20 @@ pub enum Error {
498
506
#[ error( "{0}" ) ]
499
507
Other ( #[ from] Box < dyn std:: error:: Error + Send + Sync > ) ,
500
508
#[ error( "tonic error: {0}" ) ]
501
- Tonic ( #[ from] tonic:: transport:: Error ) ,
509
+ TonicRpc ( #[ from] tonic:: Status ) ,
510
+ #[ error( "tonic error: {0}" ) ]
511
+ TonicTransport ( #[ from] tonic:: transport:: Error ) ,
502
512
#[ cfg( feature = "yup-oauth2" ) ]
503
513
#[ error( "authorizer error: {0}" ) ]
504
514
Yup ( #[ from] yup_oauth2:: Error ) ,
505
515
}
506
516
517
+ impl ExportError for Error {
518
+ fn exporter_name ( & self ) -> & ' static str {
519
+ "stackdriver"
520
+ }
521
+ }
522
+
507
523
/// As defined in https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.type#google.logging.type.LogSeverity.
508
524
enum LogSeverity {
509
525
Default = 0 ,
0 commit comments