Skip to content

Commit f040407

Browse files
committed
Refactor BatchLogProcessor
1 parent 42b4f2f commit f040407

File tree

1 file changed

+100
-32
lines changed

1 file changed

+100
-32
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

+100-32
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
199199
#[allow(clippy::large_enum_variant)]
200200
#[derive(Debug)]
201201
enum BatchMessage {
202-
/// Export logs, called when the log is emitted.
203-
ExportLog(Box<(LogRecord, InstrumentationScope)>),
202+
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
203+
ExportLog(Arc<AtomicBool>),
204204
/// ForceFlush flushes the current buffer to the exporter.
205205
ForceFlush(mpsc::SyncSender<ExportResult>),
206206
/// Shut down the worker thread, push all logs in buffer to the exporter.
@@ -209,6 +209,8 @@ enum BatchMessage {
209209
SetResource(Arc<Resource>),
210210
}
211211

212+
type LogsData = Box<(LogRecord, InstrumentationScope)>;
213+
212214
/// The `BatchLogProcessor` collects finished logs in a buffer and exports them
213215
/// in batches to the configured `LogExporter`. This processor is ideal for
214216
/// high-throughput environments, as it minimizes the overhead of exporting logs
@@ -246,11 +248,15 @@ enum BatchMessage {
246248
/// .build();
247249
///
248250
pub struct BatchLogProcessor {
249-
message_sender: SyncSender<BatchMessage>,
251+
logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
252+
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
250253
handle: Mutex<Option<thread::JoinHandle<()>>>,
251254
forceflush_timeout: Duration,
252255
shutdown_timeout: Duration,
253256
is_shutdown: AtomicBool,
257+
export_log_message_sent: Arc<AtomicBool>,
258+
current_batch_size: Arc<AtomicUsize>,
259+
max_export_batch_size: usize,
254260

255261
// Track dropped logs - we'll log this at shutdown
256262
dropped_logs_count: AtomicUsize,
@@ -279,11 +285,19 @@ impl LogProcessor for BatchLogProcessor {
279285
}
280286

281287
let result = self
282-
.message_sender
283-
.try_send(BatchMessage::ExportLog(Box::new((
284-
record.clone(),
285-
instrumentation.clone(),
286-
))));
288+
.logs_sender
289+
.try_send(Box::new((record.clone(), instrumentation.clone())));
290+
291+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) >= self.max_export_batch_size {
292+
// Check if the a control message for exporting logs is already sent to the worker thread.
293+
// If not, send a control message to export logs.
294+
// `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
295+
if !self.export_log_message_sent.swap(true, Ordering::Relaxed) {
296+
let _ = self.message_sender.try_send(BatchMessage::ExportLog(
297+
self.export_log_message_sent.clone(),
298+
)); // TODO: Handle error
299+
}
300+
}
287301

288302
if result.is_err() {
289303
// Increment dropped logs count. The first time we have to drop a log,
@@ -388,8 +402,12 @@ impl BatchLogProcessor {
388402
where
389403
E: LogExporter + Send + Sync + 'static,
390404
{
391-
let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size);
405+
let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size);
406+
let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
392407
let max_queue_size = config.max_queue_size;
408+
let max_export_batch_size = config.max_export_batch_size;
409+
let current_batch_size = Arc::new(AtomicUsize::new(0));
410+
let current_batch_size_for_thread = current_batch_size.clone();
393411

394412
let handle = thread::Builder::new()
395413
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
@@ -402,6 +420,42 @@ impl BatchLogProcessor {
402420
);
403421
let mut last_export_time = Instant::now();
404422
let mut logs = Vec::with_capacity(config.max_export_batch_size);
423+
let current_batch_size = current_batch_size_for_thread;
424+
425+
// This method gets upto `max_export_batch_size` amount of logs from the channel and exports them.
426+
// It returns the result of the export operation.
427+
// It expects the logs vec to be empty when it's called.
428+
#[inline]
429+
fn get_logs_and_export<E>(
430+
logs_receiver: &mpsc::Receiver<LogsData>,
431+
exporter: &E,
432+
logs: &mut Vec<LogsData>,
433+
last_export_time: &mut Instant,
434+
current_batch_size: &AtomicUsize,
435+
config: &BatchConfig,
436+
) -> ExportResult
437+
where
438+
E: LogExporter + Send + Sync + 'static,
439+
{
440+
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
441+
while let Ok(log) = logs_receiver.try_recv() {
442+
logs.push(log);
443+
if logs.len() == config.max_export_batch_size {
444+
break;
445+
}
446+
}
447+
448+
let count_of_logs = logs.len(); // Count of logs that will be exported
449+
let result = export_with_timeout_sync(
450+
config.max_export_timeout,
451+
exporter,
452+
logs,
453+
last_export_time,
454+
); // This method clears the logs vec after exporting
455+
456+
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
457+
result
458+
}
405459

406460
loop {
407461
let remaining_time = config
@@ -410,37 +464,44 @@ impl BatchLogProcessor {
410464
.unwrap_or(config.scheduled_delay);
411465

412466
match message_receiver.recv_timeout(remaining_time) {
413-
Ok(BatchMessage::ExportLog(log)) => {
414-
logs.push(log);
415-
if logs.len() == config.max_export_batch_size {
416-
otel_debug!(
417-
name: "BatchLogProcessor.ExportingDueToBatchSize",
418-
);
419-
let _ = export_with_timeout_sync(
420-
config.max_export_timeout,
421-
&mut exporter,
422-
&mut logs,
423-
&mut last_export_time,
424-
);
425-
}
467+
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
468+
otel_debug!(
469+
name: "BatchLogProcessor.ExportingDueToBatchSize",
470+
);
471+
472+
let _ = get_logs_and_export(
473+
&logs_receiver,
474+
&exporter,
475+
&mut logs,
476+
&mut last_export_time,
477+
&current_batch_size,
478+
&config,
479+
);
480+
481+
// Reset the export log message sent flag now it has has been processed.
482+
export_log_message_sent.store(false, Ordering::Relaxed);
426483
}
427484
Ok(BatchMessage::ForceFlush(sender)) => {
428485
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");
429-
let result = export_with_timeout_sync(
430-
config.max_export_timeout,
431-
&mut exporter,
486+
let result = get_logs_and_export(
487+
&logs_receiver,
488+
&exporter,
432489
&mut logs,
433490
&mut last_export_time,
491+
&current_batch_size,
492+
&config,
434493
);
435494
let _ = sender.send(result);
436495
}
437496
Ok(BatchMessage::Shutdown(sender)) => {
438497
otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown");
439-
let result = export_with_timeout_sync(
440-
config.max_export_timeout,
441-
&mut exporter,
498+
let result = get_logs_and_export(
499+
&logs_receiver,
500+
&exporter,
442501
&mut logs,
443502
&mut last_export_time,
503+
&current_batch_size,
504+
&config,
444505
);
445506
let _ = sender.send(result);
446507

@@ -460,11 +521,14 @@ impl BatchLogProcessor {
460521
otel_debug!(
461522
name: "BatchLogProcessor.ExportingDueToTimer",
462523
);
463-
let _ = export_with_timeout_sync(
464-
config.max_export_timeout,
465-
&mut exporter,
524+
525+
let _ = get_logs_and_export(
526+
&logs_receiver,
527+
&exporter,
466528
&mut logs,
467529
&mut last_export_time,
530+
&current_batch_size,
531+
&config,
468532
);
469533
}
470534
Err(RecvTimeoutError::Disconnected) => {
@@ -486,13 +550,17 @@ impl BatchLogProcessor {
486550

487551
// Return batch processor with link to worker
488552
BatchLogProcessor {
553+
logs_sender,
489554
message_sender,
490555
handle: Mutex::new(Some(handle)),
491556
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
492557
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
493558
is_shutdown: AtomicBool::new(false),
494559
dropped_logs_count: AtomicUsize::new(0),
495560
max_queue_size,
561+
export_log_message_sent: Arc::new(AtomicBool::new(false)),
562+
current_batch_size,
563+
max_export_batch_size,
496564
}
497565
}
498566

@@ -511,7 +579,7 @@ impl BatchLogProcessor {
511579
#[allow(clippy::vec_box)]
512580
fn export_with_timeout_sync<E>(
513581
_: Duration, // TODO, enforcing timeout in exporter.
514-
exporter: &mut E,
582+
exporter: &E,
515583
batch: &mut Vec<Box<(LogRecord, InstrumentationScope)>>,
516584
last_export_time: &mut Instant,
517585
) -> ExportResult

0 commit comments

Comments
 (0)