@@ -516,9 +516,14 @@ mod tests {
516
516
Resource ,
517
517
} ;
518
518
use async_trait:: async_trait;
519
+ use opentelemetry:: logs:: AnyValue ;
520
+ #[ cfg( feature = "logs_level_enabled" ) ]
521
+ use opentelemetry:: logs:: Severity ;
522
+ use opentelemetry:: logs:: { Logger , LoggerProvider as _} ;
523
+ use opentelemetry:: Key ;
519
524
use opentelemetry:: { logs:: LogResult , KeyValue } ;
520
525
use std:: borrow:: Cow ;
521
- use std:: sync:: Arc ;
526
+ use std:: sync:: { Arc , Mutex } ;
522
527
use std:: time:: Duration ;
523
528
524
529
#[ derive( Debug , Clone ) ]
@@ -787,4 +792,110 @@ mod tests {
787
792
788
793
assert_eq ! ( 1 , exporter. get_emitted_logs( ) . unwrap( ) . len( ) )
789
794
}
795
+
796
+ #[ derive( Debug ) ]
797
+ struct FirstProcessor {
798
+ pub ( crate ) logs : Arc < Mutex < Vec < LogData > > > ,
799
+ }
800
+
801
+ impl LogProcessor for FirstProcessor {
802
+ fn emit ( & self , data : & mut LogData ) {
803
+ data. record . attributes . get_or_insert ( vec ! [ ] ) . push ( (
804
+ Key :: from_static_str ( "processed_by" ) ,
805
+ AnyValue :: String ( "FirstProcessor" . into ( ) ) ,
806
+ ) ) ;
807
+ println ! (
808
+ "Data attributes after modification {:?}" ,
809
+ data. record. attributes
810
+ ) ;
811
+ self . logs . lock ( ) . unwrap ( ) . push ( data. clone ( ) ) ; //clone as the LogProcessor is storing the data.
812
+ }
813
+
814
+ #[ cfg( feature = "logs_level_enabled" ) ]
815
+ fn event_enabled ( & self , _level : Severity , _target : & str , _name : & str ) -> bool {
816
+ true
817
+ }
818
+
819
+ fn force_flush ( & self ) -> LogResult < ( ) > {
820
+ Ok ( ( ) )
821
+ }
822
+
823
+ fn shutdown ( & self ) -> LogResult < ( ) > {
824
+ Ok ( ( ) )
825
+ }
826
+ }
827
+
828
+ #[ derive( Debug ) ]
829
+ struct SecondProcessor {
830
+ pub ( crate ) logs : Arc < Mutex < Vec < LogData > > > ,
831
+ }
832
+
833
+ impl LogProcessor for SecondProcessor {
834
+ fn emit ( & self , data : & mut LogData ) {
835
+ println ! ( "Data attributes: {:?}" , data. record. attributes. as_ref( ) ) ;
836
+ assert ! ( data. record. attributes. as_ref( ) . map_or( false , |attrs| {
837
+ attrs. iter( ) . any( |( key, value) | {
838
+ key. as_str( ) == "processed_by"
839
+ && value == & AnyValue :: String ( "FirstProcessor" . into( ) )
840
+ } )
841
+ } ) ) ;
842
+ self . logs . lock ( ) . unwrap ( ) . push ( data. clone ( ) ) ;
843
+ }
844
+
845
+ #[ cfg( feature = "logs_level_enabled" ) ]
846
+ fn event_enabled ( & self , _level : Severity , _target : & str , _name : & str ) -> bool {
847
+ true
848
+ }
849
+
850
+ fn force_flush ( & self ) -> LogResult < ( ) > {
851
+ Ok ( ( ) )
852
+ }
853
+
854
+ fn shutdown ( & self ) -> LogResult < ( ) > {
855
+ Ok ( ( ) )
856
+ }
857
+ }
858
+ #[ test]
859
+ fn test_log_data_modification_by_multiple_processors ( ) {
860
+ let first_processor_logs = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
861
+ let second_processor_logs = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
862
+
863
+ let first_processor = FirstProcessor {
864
+ logs : Arc :: clone ( & first_processor_logs) ,
865
+ } ;
866
+ let second_processor = SecondProcessor {
867
+ logs : Arc :: clone ( & second_processor_logs) ,
868
+ } ;
869
+
870
+ let logger_provider = LoggerProvider :: builder ( )
871
+ . with_log_processor ( first_processor)
872
+ . with_log_processor ( second_processor)
873
+ . build ( ) ;
874
+
875
+ let logger = logger_provider. logger ( "test-logger" ) ;
876
+ let mut log_record = logger. create_log_record ( ) ;
877
+ log_record. body = Some ( AnyValue :: String ( "Test log" . into ( ) ) ) ;
878
+
879
+ logger. emit ( log_record) ;
880
+
881
+ assert_eq ! ( first_processor_logs. lock( ) . unwrap( ) . len( ) , 1 ) ;
882
+ assert_eq ! ( second_processor_logs. lock( ) . unwrap( ) . len( ) , 1 ) ;
883
+
884
+ let first_log = & first_processor_logs. lock ( ) . unwrap ( ) [ 0 ] ;
885
+ let second_log = & second_processor_logs. lock ( ) . unwrap ( ) [ 0 ] ;
886
+
887
+ assert ! ( first_log. record. attributes. iter( ) . any( |attrs| {
888
+ attrs. iter( ) . any( |( key, value) | {
889
+ key. as_str( ) == "processed_by"
890
+ && value == & AnyValue :: String ( "FirstProcessor" . into( ) )
891
+ } )
892
+ } ) ) ;
893
+
894
+ assert ! ( second_log. record. attributes. iter( ) . any( |attrs| {
895
+ attrs. iter( ) . any( |( key, value) | {
896
+ key. as_str( ) == "processed_by"
897
+ && value == & AnyValue :: String ( "FirstProcessor" . into( ) )
898
+ } )
899
+ } ) ) ;
900
+ }
790
901
}
0 commit comments