Skip to content

Commit

Permalink
chore(weave): Trace server binding processor improvements (#3817)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewtruong authored Mar 3, 2025
1 parent 3bc2090 commit 60b0926
Show file tree
Hide file tree
Showing 6 changed files with 567 additions and 95 deletions.
17 changes: 13 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,14 @@ def emit(self, record):
self.log_records[curr_test] = []
self.log_records[curr_test].append(record)

def get_error_logs(self):
def _get_logs(self, levelname: str):
curr_test = get_test_name()
logs = self.log_records.get(curr_test, [])

return [
record
for record in logs
if record.levelname == "ERROR"
if record.levelname == levelname
and record.name.startswith("weave")
# (Tim) For some reason that i cannot figure out, there is some test that
# a) is trying to connect to the PROD trace server
Expand All @@ -433,13 +433,22 @@ def get_error_logs(self):
and not "legacy" in record.name
]

def get_error_logs(self):
return self._get_logs("ERROR")

def get_warning_logs(self):
return self._get_logs("WARNING")


@pytest.fixture
def log_collector():
def log_collector(request):
handler = InMemoryWeaveLogCollector()
logger = logging.getLogger() # Get your specific logger here if needed
logger.addHandler(handler)
logger.setLevel(logging.ERROR) # Set the level to capture all logs
if hasattr(request, "param") and request.param == "warning":
logger.setLevel(logging.WARNING)
else:
logger.setLevel(logging.ERROR)
yield handler
logger.removeHandler(handler) # Clean up after the test

Expand Down
62 changes: 62 additions & 0 deletions tests/trace_server_bindings/test_async_batch_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from __future__ import annotations

import time
from unittest.mock import MagicMock, call

from weave.trace_server_bindings.async_batch_processor import AsyncBatchProcessor


def test_max_batch_size():
processor_fn = MagicMock()
processor = AsyncBatchProcessor(processor_fn, max_batch_size=2)

# Queue up 2 batches of 3 items
processor.enqueue([1, 2, 3])
processor.stop_accepting_new_work_and_flush_queue()

# But the max batch size is 2, so the batch is split apart
processor_fn.assert_has_calls(
[
call([1, 2]),
call([3]),
]
)


def test_min_batch_interval():
processor_fn = MagicMock()
processor = AsyncBatchProcessor(
processor_fn, max_batch_size=100, min_batch_interval=1
)

# Queue up batches of 3 items within the min_batch_interval
processor.enqueue([1, 2, 3])
time.sleep(0.1)
processor.enqueue([4, 5, 6])
time.sleep(0.1)
processor.enqueue([7, 8, 9])
processor.stop_accepting_new_work_and_flush_queue()

# Processor should batch them all together
processor_fn.assert_called_once_with([1, 2, 3, 4, 5, 6, 7, 8, 9])


def test_wait_until_all_processed():
processor_fn = MagicMock()
processor = AsyncBatchProcessor(
processor_fn, max_batch_size=100, min_batch_interval=0.01
)

processor.enqueue([1, 2, 3])
processor.stop_accepting_new_work_and_flush_queue()

# Despite queueing extra items, they will never get flushed because the processor is
# already shut down.
processor.enqueue([4, 5, 6])
processor.stop_accepting_new_work_and_flush_queue()
processor.enqueue([7, 8, 9])
processor.stop_accepting_new_work_and_flush_queue()

# We should only see the first batch. Everything else is stuck in the queue.
processor_fn.assert_has_calls([call([1, 2, 3])])
assert processor.queue.qsize() == 6
Loading

0 comments on commit 60b0926

Please sign in to comment.