-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
split queries for Astra #141
Conversation
exception: bool = False | ||
|
||
|
||
async def amerge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utility so that I can merge the multiple async iterators into a single async iterator. Couldn't find this in readily available libraries, so threw it in the utilities.
packages/langchain-graph-retriever/src/langchain_graph_retriever/adapters/astra.py
Show resolved
Hide resolved
|
||
if len(parts) == 1: | ||
return self.encode_filter(parts[0]) | ||
def _queries( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function drives the logic:
at a highlevel, we want ANY(metadata OR ids) AND user_filters
. Since that may be too big, we end up spitting out a bunch of some_ids_or_metadata AND user_filters
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better encapsulated if this were a method of the retriever (thus using self.vectorstore.codec) rather than an external utility function exposing the codec as an argument?
While I see the latter choice helping somewhat with testing, it might expose things unnecessarily. No big deal anyway, and its surface can be reduced later no problem (being a private utility and all) -- especially once the vector store starts offering all the query-encoding methods we just discussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this for testing -- I want to be able to test it in a unit test (by passing codec directly) without needing to create an Astra connection.
self, | ||
query: dict[str, Any] | None = None, | ||
filters: Iterator[dict[str, Any]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Execution methods changed to take an iterator of filters, run all of them, and combine them. They only decode new IDs.
return [content for hit in hits if (content := self._decode_hit(hit))] | ||
results: dict[str, Content] = {} | ||
for filter in filters: | ||
# TODO: Look at a thread-pool for this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could parallelize the queries in the synchronous case as well.
tasks = [asyncio.create_task(pump(aiter)) for aiter in async_iterables] | ||
|
||
try: | ||
pending_count = len(async_iterables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seem superfluous in view of line 40 (no?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hah. Yeah. So, I had GPT generate this and the tests, but GPT didn't pass it's own tests, so I had to fix. I initially tried with locks, but that got really messy, so I switched to having it output _Done
to indicate the completion of each stream. But, then I didn't need the locks and missed it because of the nonlocal. As you've caught, a lot of this goes away.
packages/langchain-graph-retriever/src/langchain_graph_retriever/adapters/astra.py
Outdated
Show resolved
Hide resolved
if len(parts) == 1: | ||
yield with_user_filters(codec.encode_filter(parts[0])) | ||
elif len(parts) > 0: | ||
yield with_user_filters(codec.encode_filter({"$or": parts})) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this need to be made into a "one yield per part"? It seems that right now it is still preparing a single query with the OR trick to bypass guardrails.
I think the whole if len(parts) == 1
... branch can become sth like:
for part in parts:
yield with_user_filters(part)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I was trying to split it into steps. Although, this is not doing the $or
on _id
(that is now separate yields). This relates to the "is it 100 in a list that we should split" or "100 conditions accumulated", etc. I'll likely revisit that, but possibly in a follow-up.
packages/langchain-graph-retriever/src/langchain_graph_retriever/adapters/astra.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! I left a few comments, most are just nitpicks/suggestions but I'd like to draw your attention to a couple of those (esp. the one-big-yield in _queries
).
I see some of this (in the Astra Adapter) changing further as soon as the underlying store implements its own "run query" method. Especially this whole layer should be able to forget about codecs entirely (more to come on this...)
Pull Request Test Coverage Report for Build 13250320363Warning: This coverage report may be inaccurate.This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.
Details
💛 - Coveralls |
No description provided.