Skip to content

Commit

Permalink
added async support to colbert and lanchain.colbert libraries (#426)
Browse files Browse the repository at this point in the history
* added async support to colbert and lanchain.colbert libraries

* fmt

* raise exeception on failed add

* limit open requests to cassandra

* updated async put and delete methods

* more async stuff

* updated langchain too

* updated timeout
  • Loading branch information
epinzur authored May 21, 2024
1 parent 9726c84 commit 62329e8
Show file tree
Hide file tree
Showing 25 changed files with 934 additions and 252 deletions.
4 changes: 4 additions & 0 deletions libs/colbert/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ pydantic = "^2.7.1"
[tool.poetry.group.test.dependencies]
ragstack-ai-tests-utils = { path = "../tests-utils", develop = true }


[tool.poetry.group.dev.dependencies]
pytest-asyncio = "^0.23.6"

28 changes: 27 additions & 1 deletion libs/colbert/ragstack_colbert/base_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,33 @@ def delete_chunks(self, doc_ids: List[str]) -> bool:
doc_ids (List[str]): A list of document identifiers specifying the chunks to be deleted.
Returns:
True if the delete was successful.
True if the all the deletes were successful.
"""

@abstractmethod
async def aadd_chunks(self, chunks: List[Chunk], concurrent_inserts: Optional[int] = 100) -> List[Tuple[str, int]]:
"""
Stores a list of embedded text chunks in the vector store
Parameters:
chunks (List[Chunk]): A list of `Chunk` instances to be stored.
concurrent_inserts (Optional[int]): How many concurrent inserts to make to the database. Defaults to 100.
Returns:
a list of tuples: (doc_id, chunk_id)
"""

@abstractmethod
async def adelete_chunks(self, doc_ids: List[str], concurrent_deletes: Optional[int] = 100) -> bool:
"""
Deletes chunks from the vector store based on their document id.
Parameters:
doc_ids (List[str]): A list of document identifiers specifying the chunks to be deleted.
concurrent_deletes (Optional[int]): How many concurrent deletes to make to the database. Defaults to 100.
Returns:
True if the all the deletes were successful.
"""

@abstractmethod
Expand Down
55 changes: 54 additions & 1 deletion libs/colbert/ragstack_colbert/base_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,60 @@ def delete_chunks(self, doc_ids: List[str]) -> bool:
doc_ids (List[str]): A list of document identifiers specifying the chunks to be deleted.
Returns:
True if the delete was successful.
True if the all the deletes were successful.
"""

# handles LlamaIndex add

@abstractmethod
async def aadd_chunks(self, chunks: List[Chunk], concurrent_inserts: Optional[int] = 100) -> List[Tuple[str, int]]:
"""
Stores a list of embedded text chunks in the vector store
Parameters:
chunks (List[Chunk]): A list of `Chunk` instances to be stored.
concurrent_inserts (Optional[int]): How many concurrent inserts to make to the database. Defaults to 100.
Returns:
a list of tuples: (doc_id, chunk_id)
"""

# handles LangChain add
@abstractmethod
async def aadd_texts(
self,
texts: List[str],
metadatas: Optional[List[Metadata]],
doc_id: Optional[str] = None,
concurrent_inserts: Optional[int] = 100,
) -> List[Tuple[str, int]]:
"""
Embeds and stores a list of text chunks and optional metadata into the vector store
Parameters:
texts (List[str]): The list of text chunks to be embedded
metadatas (Optional[List[Metadata]])): An optional list of Metadata to be stored.
If provided, these are set 1 to 1 with the texts list.
doc_id (Optional[str]): The document id associated with the texts. If not provided,
it is generated.
concurrent_inserts (Optional[int]): How many concurrent inserts to make to the database. Defaults to 100.
Returns:
a list of tuples: (doc_id, chunk_id)
"""

# handles LangChain and LlamaIndex delete
@abstractmethod
async def adelete_chunks(self, doc_ids: List[str], concurrent_deletes: Optional[int] = 100) -> bool:
"""
Deletes chunks from the vector store based on their document id.
Parameters:
doc_ids (List[str]): A list of document identifiers specifying the chunks to be deleted.
concurrent_deletes (Optional[int]): How many concurrent deletes to make to the database. Defaults to 100.
Returns:
True if the all the deletes were successful.
"""

# handles LangChain as_retriever
Expand Down
Loading

0 comments on commit 62329e8

Please sign in to comment.