forked from open-telemetry/opentelemetry-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_processor.rs
223 lines (194 loc) · 7.85 KB
/
log_processor.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
//! # OpenTelemetry Log Processor Interface
//!
//! The `LogProcessor` interface provides hooks for log record processing and
//! exporting. Log processors receive `LogRecord`s emitted by the SDK's
//! `Logger` and determine how these records are handled.
//!
//! Built-in log processors are responsible for converting logs to exportable
//! representations and passing them to configured exporters. They can be
//! registered directly with a `LoggerProvider`.
//!
//! ## Types of Log Processors
//!
//! There are currently two types of log processors available in the SDK:
//! - **SimpleLogProcessor**: Forwards log records to the exporter immediately.
//! - **BatchLogProcessor**: Buffers log records and sends them to the exporter in batches.
//!
//! For more information, see simple_log_processor.rs and batch_log_processor.rs.
//!
//! ## Diagram
//!
//! ```ascii
//! +-----+---------------+ +-----------------------+ +-------------------+
//! | | | | | | |
//! | SDK | Logger.emit() +---> (Simple)LogProcessor +---> LogExporter |
//! | | | | (Batch)LogProcessor +---> (OTLPExporter) |
//! +-----+---------------+ +-----------------------+ +-------------------+
//! ```
use crate::error::OTelSdkResult;
use crate::{logs::SdkLogRecord, Resource};
#[cfg(feature = "spec_unstable_logs_enabled")]
use opentelemetry::logs::Severity;
use opentelemetry::InstrumentationScope;
use std::fmt::Debug;
/// The interface for plugging into a [`SdkLogger`].
///
/// [`SdkLogger`]: crate::logs::SdkLogger
pub trait LogProcessor: Send + Sync + Debug {
/// Called when a log record is ready to processed and exported.
///
/// This method receives a mutable reference to `LogRecord`. If the processor
/// needs to handle the export asynchronously, it should clone the data to
/// ensure it can be safely processed without lifetime issues. Any changes
/// made to the log data in this method will be reflected in the next log
/// processor in the chain.
///
/// # Parameters
/// - `record`: A mutable reference to `LogRecord` representing the log record.
/// - `instrumentation`: The instrumentation scope associated with the log record.
fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
/// Force the logs lying in the cache to be exported.
fn force_flush(&self) -> OTelSdkResult;
/// Shuts down the processor.
/// After shutdown returns the log processor should stop processing any logs.
/// It's up to the implementation on when to drop the LogProcessor.
fn shutdown(&self) -> OTelSdkResult;
#[cfg(feature = "spec_unstable_logs_enabled")]
/// Check if logging is enabled
fn event_enabled(&self, _level: Severity, _target: &str, _name: Option<&str>) -> bool {
// By default, all logs are enabled
true
}
/// Set the resource for the log processor.
fn set_resource(&self, _resource: &Resource) {}
}
#[cfg(all(test, feature = "testing", feature = "logs"))]
pub(crate) mod tests {
use crate::logs::{LogBatch, LogExporter, SdkLogRecord};
use crate::Resource;
use crate::{
error::OTelSdkResult,
logs::{LogProcessor, SdkLoggerProvider},
};
use opentelemetry::logs::AnyValue;
use opentelemetry::logs::LogRecord as _;
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry::{InstrumentationScope, Key};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub(crate) struct MockLogExporter {
pub resource: Arc<Mutex<Option<Resource>>>,
}
impl LogExporter for MockLogExporter {
async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult {
Ok(())
}
fn shutdown(&mut self) -> OTelSdkResult {
Ok(())
}
fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
res_opt.replace(resource.clone());
})
.expect("mock log exporter shouldn't error when setting resource");
}
}
// Implementation specific to the MockLogExporter, not part of the LogExporter trait
impl MockLogExporter {
pub(crate) fn get_resource(&self) -> Option<Resource> {
(*self.resource).lock().unwrap().clone()
}
}
#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for FirstProcessor {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
// add attribute
record.add_attribute(
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);
// update body
record.body = Some("Updated by FirstProcessor".into());
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
}
#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<(SdkLogRecord, InstrumentationScope)>>>,
}
impl LogProcessor for SecondProcessor {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
assert!(record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(
record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs
.lock()
.unwrap()
.push((record.clone(), instrumentation.clone()));
}
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
}
#[test]
fn test_log_data_modification_by_multiple_processors() {
let first_processor_logs = Arc::new(Mutex::new(Vec::new()));
let second_processor_logs = Arc::new(Mutex::new(Vec::new()));
let first_processor = FirstProcessor {
logs: Arc::clone(&first_processor_logs),
};
let second_processor = SecondProcessor {
logs: Arc::clone(&second_processor_logs),
};
let logger_provider = SdkLoggerProvider::builder()
.with_log_processor(first_processor)
.with_log_processor(second_processor)
.build();
let logger = logger_provider.logger("test-logger");
let mut log_record = logger.create_log_record();
log_record.body = Some(AnyValue::String("Test log".into()));
logger.emit(log_record);
assert_eq!(first_processor_logs.lock().unwrap().len(), 1);
assert_eq!(second_processor_logs.lock().unwrap().len(), 1);
let first_log = &first_processor_logs.lock().unwrap()[0];
let second_log = &second_processor_logs.lock().unwrap()[0];
assert!(first_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(second_log.0.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
));
assert!(
first_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
assert!(
second_log.0.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
}
}