Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
gtarpenning committed Mar 4, 2025
1 parent 2244543 commit 5f624ee
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
4 changes: 2 additions & 2 deletions tests/trace/test_evaluation_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def test_evaluation_resilience(
with pytest.raises(DummyTestException):
res = await evaluation.evaluate(predict)

client_with_throwing_server.flush()
client_with_throwing_server.finish()

logs = log_collector.get_error_logs()
ag_res = Counter([k.split(", req:")[0] for k in {l.msg for l in logs}])
Expand All @@ -172,7 +172,7 @@ async def test_evaluation_resilience(
res = await evaluation.evaluate(predict)
assert res["score"]["true_count"] == 1

client_with_throwing_server.flush()
client_with_throwing_server.finish()

logs = log_collector.get_error_logs()
ag_res = Counter([k.split(", req:")[0] for k in {l.msg for l in logs}])
Expand Down
23 changes: 18 additions & 5 deletions weave/trace/weave_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1994,10 +1994,11 @@ def num_outstanding_jobs(self) -> int:

# Add call batch uploads if available
if self._server_is_flushable:
total += self.server.call_processor.num_outstanding_jobs # type: ignore
server = cast(RemoteHTTPTraceServer, self.server)
total += server.call_processor.num_outstanding_jobs
return total

def flush(
def finish(
self,
use_progress_bar: bool = True,
callback: Callable[[FlushStatus], None] | None = None,
Expand Down Expand Up @@ -2028,6 +2029,11 @@ def flush(
else:
self._flush()

@deprecated(new_name="finish")
def flush(self) -> None:
"""Renamed 'finish' for clarity."""
self.finish()

def _flush_with_callback(
self,
callback: Callable[[FlushStatus], None],
Expand Down Expand Up @@ -2111,13 +2117,19 @@ def _flush_with_callback(
callback(final_status)

def _flush(self) -> None:
"""Used to wait until all currently enqueued jobs are processed."""
# Used to wait until all currently enqueued jobs are processed
if not self.future_executor._in_thread_context.get():
self.future_executor.flush()
if self.future_executor_fastlane:
self.future_executor_fastlane.flush()
if self._server_is_flushable:
self.server.call_processor.stop_accepting_new_work_and_flush_queue() # type: ignore
# We don't want to do an instance check here because it could
# be susceptible to shutdown race conditions. So we save a boolean
# _server_is_flushable and only call this if we know the server is
# flushable. The # type: ignore is safe because we check the type
# first.
server = cast(RemoteHTTPTraceServer, self.server)
server.call_processor.stop_accepting_new_work_and_flush_queue()

def _get_pending_jobs(self) -> PendingJobCounts:
"""Get the current number of pending jobs for each type.
Expand All @@ -2135,7 +2147,8 @@ def _get_pending_jobs(self) -> PendingJobCounts:
fastlane_jobs = self.future_executor_fastlane.num_outstanding_futures
call_processor_jobs = 0
if self._server_is_flushable:
call_processor_jobs = self.server.call_processor.num_outstanding_jobs # type: ignore
server = cast(RemoteHTTPTraceServer, self.server)
call_processor_jobs = server.call_processor.num_outstanding_jobs

return PendingJobCounts(
main_jobs=main_jobs,
Expand Down

0 comments on commit 5f624ee

Please sign in to comment.