Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split queries for Astra #141

Merged
merged 6 commits into from
Feb 12, 2025
Merged

split queries for Astra #141

merged 6 commits into from
Feb 12, 2025

Conversation

bjchambers
Copy link
Collaborator

No description provided.

exception: bool = False


async def amerge(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Utility so that I can merge the multiple async iterators into a single async iterator. Couldn't find this in readily available libraries, so threw it in the utilities.


if len(parts) == 1:
return self.encode_filter(parts[0])
def _queries(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function drives the logic:

at a highlevel, we want ANY(metadata OR ids) AND user_filters. Since that may be too big, we end up spitting out a bunch of some_ids_or_metadata AND user_filters.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better encapsulated if this were a method of the retriever (thus using self.vectorstore.codec) rather than an external utility function exposing the codec as an argument?
While I see the latter choice helping somewhat with testing, it might expose things unnecessarily. No big deal anyway, and its surface can be reduced later no problem (being a private utility and all) -- especially once the vector store starts offering all the query-encoding methods we just discussed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this for testing -- I want to be able to test it in a unit test (by passing codec directly) without needing to create an Astra connection.

self,
query: dict[str, Any] | None = None,
filters: Iterator[dict[str, Any]],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution methods changed to take an iterator of filters, run all of them, and combine them. They only decode new IDs.

return [content for hit in hits if (content := self._decode_hit(hit))]
results: dict[str, Content] = {}
for filter in filters:
# TODO: Look at a thread-pool for this.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could parallelize the queries in the synchronous case as well.

Copy link

github-actions bot commented Feb 10, 2025

Test Results

    8 files  ± 0    8 suites  ±0   2m 33s ⏱️ +4s
  406 tests + 6  406 ✅ + 6    0 💤 ±0  0 ❌ ±0 
1 624 runs  +24  900 ✅ +24  724 💤 ±0  0 ❌ ±0 

Results for commit 675fdf2. ± Comparison against base commit b26aed6.

♻️ This comment has been updated with latest results.

tasks = [asyncio.create_task(pump(aiter)) for aiter in async_iterables]

try:
pending_count = len(async_iterables)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seem superfluous in view of line 40 (no?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah. Yeah. So, I had GPT generate this and the tests, but GPT didn't pass it's own tests, so I had to fix. I initially tried with locks, but that got really messy, so I switched to having it output _Done to indicate the completion of each stream. But, then I didn't need the locks and missed it because of the nonlocal. As you've caught, a lot of this goes away.

if len(parts) == 1:
yield with_user_filters(codec.encode_filter(parts[0]))
elif len(parts) > 0:
yield with_user_filters(codec.encode_filter({"$or": parts}))
Copy link

@hemidactylus hemidactylus Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this need to be made into a "one yield per part"? It seems that right now it is still preparing a single query with the OR trick to bypass guardrails.

I think the whole if len(parts) == 1 ... branch can become sth like:

    for part in parts:
        yield with_user_filters(part)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I was trying to split it into steps. Although, this is not doing the $or on _id (that is now separate yields). This relates to the "is it 100 in a list that we should split" or "100 conditions accumulated", etc. I'll likely revisit that, but possibly in a follow-up.

Copy link

@hemidactylus hemidactylus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! I left a few comments, most are just nitpicks/suggestions but I'd like to draw your attention to a couple of those (esp. the one-big-yield in _queries).

I see some of this (in the Astra Adapter) changing further as soon as the underlying store implements its own "run query" method. Especially this whole layer should be able to forget about codecs entirely (more to come on this...)

@bjchambers bjchambers merged commit e0b5f4c into main Feb 12, 2025
9 checks passed
@bjchambers bjchambers deleted the astra-queries branch February 12, 2025 17:48
@coveralls
Copy link

coveralls commented Feb 13, 2025

Pull Request Test Coverage Report for Build 13250320363

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 99 of 101 (98.02%) changed or added relevant lines in 2 files are covered.
  • 3 unchanged lines in 1 file lost coverage.
  • Overall coverage decreased (-0.05%) to 92.822%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/langchain-graph-retriever/src/langchain_graph_retriever/adapters/astra.py 68 70 97.14%
Files with Coverage Reduction New Missed Lines %
packages/langchain-graph-retriever/src/langchain_graph_retriever/adapters/astra.py 3 93.55%
Totals Coverage Status
Change from base Build 13207928567: -0.05%
Covered Lines: 1515
Relevant Lines: 1604

💛 - Coveralls

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants