Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(weave): add rich progress bar to client flush #3828

Merged
merged 11 commits into from
Mar 4, 2025
204 changes: 204 additions & 0 deletions weave/trace/client_progress_bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""Progress bar utilities for WeaveClient.

This module provides functionality for displaying progress bars when flushing
tasks in the WeaveClient.
"""

import time
from typing import Optional

from rich.console import Console
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
)

from weave.trace.concurrent.futures import FutureExecutor


class TaskProgressBar:
"""A class to manage and display progress for task execution.

This class provides a rich progress bar interface for tracking the execution
of tasks in the WeaveClient.
"""

def __init__(
self,
main_executor: FutureExecutor,
fastlane_executor: Optional[FutureExecutor] = None,
):
"""Initialize the TaskProgressBar.

Args:
main_executor: The main executor for regular tasks.
fastlane_executor: The fastlane executor for file upload tasks, if any.
"""
self.main_executor = main_executor
self.fastlane_executor = fastlane_executor
self.console = Console()

# Initialize tracking variables
self.initial_main_jobs = main_executor.num_outstanding_futures
self.initial_fastlane_jobs = 0
if fastlane_executor:
self.initial_fastlane_jobs = fastlane_executor.num_outstanding_futures

self.total_initial_jobs = self.initial_main_jobs + self.initial_fastlane_jobs

# Progress tracking state
self.prev_main_jobs = self.initial_main_jobs
self.prev_fastlane_jobs = self.initial_fastlane_jobs
self.max_total_jobs = self.total_initial_jobs
self.total_completed = 0

def _create_progress_instance(self) -> Progress:
"""Create and configure a Rich Progress instance.

Returns:
A configured Rich Progress instance.
"""
return Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(bar_width=None, complete_style="magenta"),
TaskProgressColumn(),
TimeElapsedColumn(),
console=self.console,
refresh_per_second=10,
expand=True,
transient=False,
)

def _get_current_job_counts(self) -> tuple[int, int, int]:
"""Get the current counts of outstanding jobs.

Returns:
A tuple containing (current_main_jobs, current_fastlane_jobs, current_total_jobs)
"""
current_main_jobs = self.main_executor.num_outstanding_futures
current_fastlane_jobs = 0
if self.fastlane_executor:
current_fastlane_jobs = self.fastlane_executor.num_outstanding_futures

current_total_jobs = current_main_jobs + current_fastlane_jobs
return current_main_jobs, current_fastlane_jobs, current_total_jobs

def _calculate_completed_jobs(
self, current_main_jobs: int, current_fastlane_jobs: int
) -> int:
"""Calculate the number of jobs completed since the last update.

Args:
current_main_jobs: Current count of main jobs.
current_fastlane_jobs: Current count of fastlane jobs.

Returns:
Number of jobs completed in this iteration.
"""
main_completed = max(0, self.prev_main_jobs - current_main_jobs)
fastlane_completed = max(0, self.prev_fastlane_jobs - current_fastlane_jobs)
return main_completed + fastlane_completed

def _format_job_details(
self, current_main_jobs: int, current_fastlane_jobs: int
) -> str:
"""Format job details for the progress bar description.

Args:
current_main_jobs: Current count of main jobs.
current_fastlane_jobs: Current count of fastlane jobs.

Returns:
Formatted string describing job details.
"""
job_details = []
if current_main_jobs > 0:
job_details.append(f"{current_main_jobs} main")
if current_fastlane_jobs > 0:
job_details.append(f"{current_fastlane_jobs} file-upload")

return ", ".join(job_details) if job_details else "none"

def _has_pending_jobs(self) -> bool:
"""Check if there are any pending jobs.

Returns:
True if there are pending jobs, False otherwise.
"""
if self.main_executor.num_outstanding_futures > 0:
return True
if (
self.fastlane_executor
and self.fastlane_executor.num_outstanding_futures > 0
):
return True
return False

def run(self) -> None:
"""Run the progress bar to track task execution until completion."""
if self.total_initial_jobs == 0:
return

print(f"Flushing {self.total_initial_jobs} pending tasks...")

with self._create_progress_instance() as progress:
# Create a task for tracking progress
task_id = progress.add_task("Flushing tasks", total=self.total_initial_jobs)

while self._has_pending_jobs():
current_main_jobs, current_fastlane_jobs, current_total_jobs = (
self._get_current_job_counts()
)

# If new jobs were added, update the total
if current_total_jobs > self.max_total_jobs - self.total_completed:
new_jobs = current_total_jobs - (
self.max_total_jobs - self.total_completed
)
self.max_total_jobs += new_jobs
progress.update(task_id, total=self.max_total_jobs)

# Calculate completed jobs since last update
completed_this_iteration = self._calculate_completed_jobs(
current_main_jobs, current_fastlane_jobs
)

# Update progress bar
if completed_this_iteration > 0:
progress.update(task_id, advance=completed_this_iteration)
self.total_completed += completed_this_iteration

# Update progress bar description
job_details_str = self._format_job_details(
current_main_jobs, current_fastlane_jobs
)
progress.update(
task_id,
description=f"Flushing tasks ({current_total_jobs} remaining: {job_details_str})",
)

# Store current counts for next iteration
self.prev_main_jobs = current_main_jobs
self.prev_fastlane_jobs = current_fastlane_jobs

# Sleep briefly to allow background threads to make progress
time.sleep(0.1)


def flush_with_progress_bar(
main_executor: FutureExecutor,
fastlane_executor: Optional[FutureExecutor] = None,
) -> None:
"""Flush tasks with a progress bar.

Args:
main_executor: The main executor.
fastlane_executor: The fastlane executor, if any.
"""
progress_bar = TaskProgressBar(main_executor, fastlane_executor)
progress_bar.run()
5 changes: 5 additions & 0 deletions weave/trace/concurrent/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def __init__(
self._in_thread_context = ContextVar("in_deferred_context", default=False)
atexit.register(self._shutdown)

@property
def num_outstanding_futures(self) -> int:
with self._active_futures_lock:
return len(self._active_futures)

def defer(self, f: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
"""
Defer a function to be executed in a thread pool.
Expand Down
41 changes: 37 additions & 4 deletions weave/trace/weave_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from weave import version
from weave.trace import trace_sentry, urls
from weave.trace.client_progress_bar import flush_with_progress_bar
from weave.trace.concurrent.futures import FutureExecutor
from weave.trace.context import call_context
from weave.trace.context import weave_client_context as weave_client_context
Expand Down Expand Up @@ -1972,15 +1973,27 @@ def _ref_uri(self, name: str, version: str, path: str) -> str:

def flush(self) -> None:
"""
An optional flushing method for the client.
Forces all background tasks to be processed, which ensures parallel processing
during main thread execution. Can improve performance when user code completes
before data has been uploaded to the server.
Flushes all background tasks to ensure they are processed.

This method blocks until all currently enqueued jobs are processed,
displaying a progress bar to show the status of the pending tasks.
It ensures parallel processing during main thread execution and can
improve performance when user code completes before data has been
uploaded to the server.
"""
# Use the progress bar utility
flush_with_progress_bar(
main_executor=self.future_executor,
fastlane_executor=self.future_executor_fastlane,
)

# Make sure all jobs are processed, including `call_processor` jobs which
# are not handled by the progress bar utility
self._flush()

def _flush(self) -> None:
# Used to wait until all currently enqueued jobs are processed
# This is basic flush implementation without progress bar
if not self.future_executor._in_thread_context.get():
self.future_executor.flush()
if self.future_executor_fastlane:
Expand All @@ -1999,6 +2012,26 @@ def _send_file_create(self, req: FileCreateReq) -> Future[FileCreateRes]:
return self.future_executor_fastlane.defer(self.server.file_create, req)
return self.future_executor.defer(self.server.file_create, req)

@property
def num_outstanding_jobs(self) -> int:
"""
Returns the total number of pending jobs across all executors and the server.

This property can be used to check the progress of background tasks
without blocking the main thread.

Returns:
int: The total number of pending jobs
"""
total = self.future_executor.num_outstanding_futures
if self.future_executor_fastlane:
total += self.future_executor_fastlane.num_outstanding_futures

# Add call batch uploads if available
if self._server_is_flushable:
total += self.server.call_processor.num_outstanding_jobs # type: ignore
return total


def get_parallelism_settings() -> tuple[int | None, int | None]:
total_parallelism = client_parallelism()
Expand Down
6 changes: 6 additions & 0 deletions weave/trace_server_bindings/async_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ def __init__(

atexit.register(self.stop_accepting_new_work_and_flush_queue)

@property
def num_outstanding_jobs(self) -> int:
"""Returns the number of items currently in the queue."""
with self.lock:
return self.queue.qsize()

def enqueue(self, items: list[T]) -> None:
"""
Enqueues a list of items to be processed.
Expand Down