Skip to content

Commit

Permalink
feat(py/veneer): implemented generate_stream veneer (#2230)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelgj authored Mar 4, 2025
1 parent e0aadca commit 54d439e
Show file tree
Hide file tree
Showing 6 changed files with 641 additions and 61 deletions.
8 changes: 7 additions & 1 deletion py/packages/genkit/src/genkit/ai/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ def message_parser(msg: MessageWrapper):
# if the loop will continue, stream out the tool response message...
if on_chunk:
on_chunk(
make_chunk('tool', GenerateResponseChunk(content=tool_msg.content))
make_chunk(
'tool',
GenerateResponseChunk(
role=tool_msg.role, content=tool_msg.content
),
)
)

next_request = copy.copy(raw_request)
Expand All @@ -185,6 +190,7 @@ def message_parser(msg: MessageWrapper):
# middleware: middleware,
current_turn=current_turn + 1,
message_index=message_index + 1,
on_chunk=on_chunk,
)


Expand Down
107 changes: 107 additions & 0 deletions py/packages/genkit/src/genkit/core/aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2025 Google LLC
# SPDX-License-Identifier: Apache-2.0

"""Asyncio helpers."""

from asyncio import FIRST_COMPLETED, Future, Queue, ensure_future, wait
from typing import Any, AsyncIterator


class Channel[T]:
"""
An asynchronous channel for sending and receiving values.
This class provides an asynchronous queue-like interface, allowing
values to be sent and received between different parts of an
asynchronous program. It also supports closing the channel,
which will signal to any receivers that no more values will be sent.
"""

def __init__(self) -> None:
"""
Initializes a new Channel.
The channel is initialized with an internal queue to store values,
a future to signal when the channel is closed, and an optional
close future.
"""
self.queue = Queue()
self.closed = Future()
return

def __aiter__(self) -> AsyncIterator[T]:
"""
Returns the asynchronous iterator for the channel.
"""
return self

async def __anext__(self) -> T:
"""
Retrieves the next value from the channel.
If the queue is not empty, the value is returned immediately.
Otherwise, it waits until a value is available or the channel is closed.
Raises:
StopAsyncIteration: If the channel is closed and no more values
are available.
Returns:
Any: The next value from the channel.
"""
if not self.queue.empty():
return self.queue.get_nowait()
pop = ensure_future(self.__pop())
if not self.__close_future:
return await pop
finished, _ = await wait(
[pop, self.__close_future], return_when=FIRST_COMPLETED
)
if pop in finished:
return pop.result()
if self.__close_future in finished:
raise StopAsyncIteration()
return await pop

def send(self, value: T):
"""
Sends a value into the channel.
The value is added to the internal queue.
Args:
value: The value to send.
"""
return self.queue.put_nowait(value)

def set_close_future(self, future: Future):
"""
Sets a future that, when completed, will close the channel.
Args:
f (Future): The future to set.
"""
self.__close_future = ensure_future(future)
self.__close_future.add_done_callback(
lambda v: self.closed.set_result(v.result())
)

async def __pop(self) -> T:
"""
Asynchronously retrieves a value from the queue.
This method waits until a value is available in the queue.
Raises:
StopAsyncIteration: If a None value is retrieved,
indicating the channel is closed.
Returns:
Any: The retrieved value.
"""
r = await self.queue.get()
self.queue.task_done()
if not r:
raise StopAsyncIteration

return r
93 changes: 91 additions & 2 deletions py/packages/genkit/src/genkit/veneer/veneer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
import logging
import os
import threading
from asyncio import Future
from http.server import HTTPServer
from typing import Any
from typing import Any, AsyncIterator

from genkit.ai.embedding import EmbedRequest, EmbedResponse
from genkit.ai.formats import built_in_formats
from genkit.ai.generate import StreamingCallback as ModelStreamingCallback
from genkit.ai.generate import generate_action
from genkit.ai.model import GenerateResponseWrapper
from genkit.ai.model import (
GenerateResponseChunkWrapper,
GenerateResponseWrapper,
)
from genkit.core.action import ActionKind
from genkit.core.aio import Channel
from genkit.core.environment import is_dev_environment
from genkit.core.reflection import make_reflection_server
from genkit.core.schema import to_json_schema
Expand Down Expand Up @@ -225,6 +230,90 @@ async def generate(
on_chunk=on_chunk,
)

def generate_stream(
self,
model: str | None = None,
prompt: str | Part | list[Part] | None = None,
system: str | Part | list[Part] | None = None,
messages: list[Message] | None = None,
tools: list[str] | None = None,
return_tool_requests: bool | None = None,
tool_choice: ToolChoice = None,
config: GenerationCommonConfig | dict[str, Any] | None = None,
max_turns: int | None = None,
context: dict[str, Any] | None = None,
output_format: str | None = None,
output_content_type: str | None = None,
output_instructions: bool | str | None = None,
output_schema: type | dict[str, Any] | None = None,
output_constrained: bool | None = None,
) -> tuple[
AsyncIterator[GenerateResponseChunkWrapper],
Future[GenerateResponseWrapper],
]:
"""Generates text or structured data using a language model and streams the response.
This function provides a flexible interface for interacting with various language models,
supporting both simple text generation and more complex interactions involving tools and
structured conversations.
Args:
model: Optional. The name of the model to use for generation. If not provided, a default
model may be used.
prompt: Optional. A single prompt string, a `Part` object, or a list of `Part` objects
to provide as input to the model. This is used for simple text generation.
system: Optional. A system message string, a `Part` object, or a list of `Part` objects
to provide context or instructions to the model, especially for chat-based models.
messages: Optional. A list of `Message` objects representing a conversation history.
This is used for chat-based models to maintain context.
tools: Optional. A list of tool names (strings) that the model can use.
return_tool_requests: Optional. If `True`, the model will return tool requests instead of
executing them directly.
tool_choice: Optional. A `ToolChoice` object specifying how the model should choose
which tool to use.
config: Optional. A `GenerationCommonConfig` object or a dictionary containing configuration
parameters for the generation process. This allows fine-tuning the model's
behavior.
max_turns: Optional. The maximum number of turns in a conversation.
on_chunk: Optional. A callback function of type `ModelStreamingCallback` that is called
for each chunk of generated text during streaming.
context: Optional. A dictionary containing additional context information that can be
used during generation.
Returns:
A `GenerateResponseWrapper` object containing the model's response, which may include
generated text, tool requests, or other relevant information.
Note:
- The `tools`, `return_tool_requests`, and `tool_choice` arguments are used for models
that support tool usage.
- The `on_chunk` argument enables streaming responses, allowing you to process the
generated content as it becomes available.
"""
stream = Channel()

resp = self.generate(
model=model,
prompt=prompt,
system=system,
messages=messages,
tools=tools,
return_tool_requests=return_tool_requests,
tool_choice=tool_choice,
config=config,
max_turns=max_turns,
context=context,
output_format=output_format,
output_content_type=output_content_type,
output_instructions=output_instructions,
output_schema=output_schema,
output_constrained=output_constrained,
on_chunk=lambda c: stream.send(c),
)
stream.set_close_future(resp)

return (stream, stream.closed)

async def embed(
self, model: str | None = None, documents: list[str] | None = None
) -> EmbedResponse:
Expand Down
Loading

0 comments on commit 54d439e

Please sign in to comment.