Skip to content

Commit 888d5a3

Browse files
utpillalalitbcijothomas
authored
Refactor BatchLogProcessor (#2494)
Co-authored-by: Lalit Kumar Bhasin <lalit_fin@yahoo.com> Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
1 parent 73859c3 commit 888d5a3

File tree

1 file changed

+120
-32
lines changed

1 file changed

+120
-32
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

+120-32
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
199199
#[allow(clippy::large_enum_variant)]
200200
#[derive(Debug)]
201201
enum BatchMessage {
202-
/// Export logs, called when the log is emitted.
203-
ExportLog(Box<(LogRecord, InstrumentationScope)>),
202+
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
203+
ExportLog(Arc<AtomicBool>),
204204
/// ForceFlush flushes the current buffer to the exporter.
205205
ForceFlush(mpsc::SyncSender<ExportResult>),
206206
/// Shut down the worker thread, push all logs in buffer to the exporter.
@@ -209,6 +209,8 @@ enum BatchMessage {
209209
SetResource(Arc<Resource>),
210210
}
211211

212+
type LogsData = Box<(LogRecord, InstrumentationScope)>;
213+
212214
/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
213215
/// in batches to the configured `LogExporter`. This processor is ideal for
214216
/// high-throughput environments, as it minimizes the overhead of exporting logs
@@ -246,11 +248,15 @@ enum BatchMessage {
246248
/// .build();
247249
///
248250
pub struct BatchLogProcessor {
249-
message_sender: SyncSender<BatchMessage>,
251+
logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
252+
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
250253
handle: Mutex<Option<thread::JoinHandle<()>>>,
251254
forceflush_timeout: Duration,
252255
shutdown_timeout: Duration,
253256
is_shutdown: AtomicBool,
257+
export_log_message_sent: Arc<AtomicBool>,
258+
current_batch_size: Arc<AtomicUsize>,
259+
max_export_batch_size: usize,
254260

255261
// Track dropped logs - we'll log this at shutdown
256262
dropped_logs_count: AtomicUsize,
@@ -279,11 +285,8 @@ impl LogProcessor for BatchLogProcessor {
279285
}
280286

281287
let result = self
282-
.message_sender
283-
.try_send(BatchMessage::ExportLog(Box::new((
284-
record.clone(),
285-
instrumentation.clone(),
286-
))));
288+
.logs_sender
289+
.try_send(Box::new((record.clone(), instrumentation.clone())));
287290

288291
if result.is_err() {
289292
// Increment dropped logs count. The first time we have to drop a log,
@@ -292,6 +295,37 @@ impl LogProcessor for BatchLogProcessor {
292295
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
293296
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
294297
}
298+
return;
299+
}
300+
301+
// At this point, sending the log record to the data channel was successful.
302+
// Increment the current batch size and check if it has reached the max export batch size.
303+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
304+
{
305+
// Check if the a control message for exporting logs is already sent to the worker thread.
306+
// If not, send a control message to export logs.
307+
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
308+
309+
if !self.export_log_message_sent.load(Ordering::Relaxed) {
310+
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
311+
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
312+
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
313+
// We could have used compare_exchange as well here, but it's more verbose than swap.
314+
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
315+
match self.message_sender.try_send(BatchMessage::ExportLog(
316+
self.export_log_message_sent.clone(),
317+
)) {
318+
Ok(_) => {
319+
// Control message sent successfully.
320+
}
321+
Err(_err) => {
322+
// TODO: Log error
323+
// If the control message could not be sent, reset the `export_log_message_sent` flag.
324+
self.export_log_message_sent.store(false, Ordering::Relaxed);
325+
}
326+
}
327+
}
328+
}
295329
}
296330
}
297331

@@ -388,8 +422,12 @@ impl BatchLogProcessor {
388422
where
389423
E: LogExporter + Send + Sync + 'static,
390424
{
391-
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
425+
let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
426+
let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
392427
let max_queue_size = config.max_queue_size;
428+
let max_export_batch_size = config.max_export_batch_size;
429+
let current_batch_size = Arc::new(AtomicUsize::new(0));
430+
let current_batch_size_for_thread = current_batch_size.clone();
393431

394432
let handle = thread::Builder::new()
395433
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
@@ -402,6 +440,42 @@ impl BatchLogProcessor {
402440
);
403441
let mut last_export_time = Instant::now();
404442
let mut logs = Vec::with_capacity(config.max_export_batch_size);
443+
let current_batch_size = current_batch_size_for_thread;
444+
445+
// This method gets upto `max_export_batch_size` amount of logs from the channel and exports them.
446+
// It returns the result of the export operation.
447+
// It expects the logs vec to be empty when it's called.
448+
#[inline]
449+
fn get_logs_and_export<E>(
450+
logs_receiver: &mpsc::Receiver<LogsData>,
451+
exporter: &E,
452+
logs: &mut Vec<LogsData>,
453+
last_export_time: &mut Instant,
454+
current_batch_size: &AtomicUsize,
455+
config: &BatchConfig,
456+
) -> ExportResult
457+
where
458+
E: LogExporter + Send + Sync + 'static,
459+
{
460+
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
461+
while let Ok(log) = logs_receiver.try_recv() {
462+
logs.push(log);
463+
if logs.len() == config.max_export_batch_size {
464+
break;
465+
}
466+
}
467+
468+
let count_of_logs = logs.len(); // Count of logs that will be exported
469+
let result = export_with_timeout_sync(
470+
config.max_export_timeout,
471+
exporter,
472+
logs,
473+
last_export_time,
474+
); // This method clears the logs vec after exporting
475+
476+
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
477+
result
478+
}
405479

406480
loop {
407481
let remaining_time = config
@@ -410,37 +484,44 @@ impl BatchLogProcessor {
410484
.unwrap_or(config.scheduled_delay);
411485

412486
match message_receiver.recv_timeout(remaining_time) {
413-
Ok(BatchMessage::ExportLog(log)) => {
414-
logs.push(log);
415-
if logs.len() == config.max_export_batch_size {
416-
otel_debug!(
417-
name: "BatchLogProcessor.ExportingDueToBatchSize",
418-
);
419-
let _ = export_with_timeout_sync(
420-
config.max_export_timeout,
421-
&mut exporter,
422-
&mut logs,
423-
&mut last_export_time,
424-
);
425-
}
487+
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
488+
otel_debug!(
489+
name: "BatchLogProcessor.ExportingDueToBatchSize",
490+
);
491+
492+
let _ = get_logs_and_export(
493+
&logs_receiver,
494+
&exporter,
495+
&mut logs,
496+
&mut last_export_time,
497+
&current_batch_size,
498+
&config,
499+
);
500+
501+
// Reset the export log message sent flag now it has has been processed.
502+
export_log_message_sent.store(false, Ordering::Relaxed);
426503
}
427504
Ok(BatchMessage::ForceFlush(sender)) => {
428505
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
429-
let result = export_with_timeout_sync(
430-
config.max_export_timeout,
431-
&mut exporter,
506+
let result = get_logs_and_export(
507+
&logs_receiver,
508+
&exporter,
432509
&mut logs,
433510
&mut last_export_time,
511+
&current_batch_size,
512+
&config,
434513
);
435514
let _ = sender.send(result);
436515
}
437516
Ok(BatchMessage::Shutdown(sender)) => {
438517
otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
439-
let result = export_with_timeout_sync(
440-
config.max_export_timeout,
441-
&mut exporter,
518+
let result = get_logs_and_export(
519+
&logs_receiver,
520+
&exporter,
442521
&mut logs,
443522
&mut last_export_time,
523+
&current_batch_size,
524+
&config,
444525
);
445526
let _ = sender.send(result);
446527

@@ -460,11 +541,14 @@ impl BatchLogProcessor {
460541
otel_debug!(
461542
name: "BatchLogProcessor.ExportingDueToTimer",
462543
);
463-
let _ = export_with_timeout_sync(
464-
config.max_export_timeout,
465-
&mut exporter,
544+
545+
let _ = get_logs_and_export(
546+
&logs_receiver,
547+
&exporter,
466548
&mut logs,
467549
&mut last_export_time,
550+
&current_batch_size,
551+
&config,
468552
);
469553
}
470554
Err(RecvTimeoutError::Disconnected) => {
@@ -486,13 +570,17 @@ impl BatchLogProcessor {
486570

487571
// Return batch processor with link to worker
488572
BatchLogProcessor {
573+
logs_sender,
489574
message_sender,
490575
handle: Mutex::new(Some(handle)),
491576
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
492577
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
493578
is_shutdown: AtomicBool::new(false),
494579
dropped_logs_count: AtomicUsize::new(0),
495580
max_queue_size,
581+
export_log_message_sent: Arc::new(AtomicBool::new(false)),
582+
current_batch_size,
583+
max_export_batch_size,
496584
}
497585
}
498586

@@ -511,7 +599,7 @@ impl BatchLogProcessor {
511599
#[allow(clippy::vec_box)]
512600
fn export_with_timeout_sync<E>(
513601
_: Duration, // TODO, enforcing timeout in exporter.
514-
exporter: &mut E,
602+
exporter: &E,
515603
batch: &mut Vec<Box<(LogRecord, InstrumentationScope)>>,
516604
last_export_time: &mut Instant,
517605
) -> ExportResult

0 commit comments

Comments
 (0)