Skip to content

Commit

Permalink
Use getbuffer() to create views into vector and doc id blobs, instead…
Browse files Browse the repository at this point in the history
… of copying with read()

Signed-off-by: Rohan Chitale <rchital@amazon.com>
  • Loading branch information
rchitale7 committed Feb 27, 2025
1 parent e5a8aae commit 4262ce7
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,19 @@ def parse(
VectorsDatasetError: If there are any errors during parsing or validation.
"""
try:
np_doc_ids = np.frombuffer(doc_ids.read(), dtype="<i4")
VectorsDataset.check_dimensions(np_doc_ids, doc_count)

# Create a view into the buffer, to prevent additional allocation of memory
vector_view = vectors.getbuffer()
np_vectors = np.frombuffer(
vectors.read(), dtype=VectorsDataset.get_numpy_dtype(vector_dtype)
vector_view, dtype=VectorsDataset.get_numpy_dtype(vector_dtype)
)
VectorsDataset.check_dimensions(np_vectors, doc_count * dimension)
np_vectors = np_vectors.reshape(doc_count, dimension)

# Do the same for doc ids
doc_id_view = doc_ids.getbuffer()
np_doc_ids = np.frombuffer(doc_id_view, dtype="<i4")
VectorsDataset.check_dimensions(np_doc_ids, doc_count)

except (ValueError, TypeError, MemoryError, RuntimeError) as e:
raise VectorsDatasetError(f"Error parsing vectors: {e}") from e
return VectorsDataset(np_vectors, np_doc_ids)
46 changes: 21 additions & 25 deletions remote_vector_index_builder/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@


def create_vectors_dataset(
index_build_params: IndexBuildParameters, object_store_config: Dict[str, Any]
index_build_params: IndexBuildParameters,
object_store_config: Dict[str, Any],
vector_bytes_buffer: BytesIO,
doc_id_bytes_buffer: BytesIO,
) -> VectorsDataset:
"""
Downloads vector and document ID data from object storage and creates a VectorsDataset.
This function performs the first step in the index building process by:
1. Creating an appropriate object store instance
2. Downloading vector data from the specified vector_path
3. Downloading document IDs from the specified doc_id_path
2. Downloading vector data from the specified vector_path, into the vector_bytes_buffer
3. Downloading document IDs from the specified doc_id_path, into the doc_id_bytes_buffer
4. Combining them into a VectorsDataset object
Args:
Expand All @@ -49,6 +52,8 @@ def create_vectors_dataset(
- repository_type: Type of object store to use
object_store_config (Dict[str, Any]): Configuration for the object store
containing connection details
vector_bytes_buffer: Buffer for storing vector binary data
doc_id_bytes_buffer: Buffer for storing doc id binary data
Returns:
VectorsDataset: A dataset object containing:
Expand All @@ -57,6 +62,9 @@ def create_vectors_dataset(
Note:
- Uses BytesIO buffers for memory-efficient data handling
- The caller is responsible for closing each buffer
- Before closing the buffers, caller must call free_vector_space on VectorDataset object,
to remove all references to the underlying data.
- Both vector and document ID files must exist in object storage
- The number of vectors must match the number of document IDs
- Memory usage scales with the size of the vector and document ID data
Expand All @@ -72,28 +80,16 @@ def create_vectors_dataset(
index_build_params, object_store_config
)

vector_bytes_buffer = None
doc_id_bytes_buffer = None

try:
vector_bytes_buffer = BytesIO()
doc_id_bytes_buffer = BytesIO()

object_store.read_blob(index_build_params.vector_path, vector_bytes_buffer)
object_store.read_blob(index_build_params.doc_id_path, doc_id_bytes_buffer)

return VectorsDataset.parse(
vector_bytes_buffer,
doc_id_bytes_buffer,
index_build_params.dimension,
index_build_params.doc_count,
index_build_params.data_type,
)
finally:
if vector_bytes_buffer:
vector_bytes_buffer.close()
if doc_id_bytes_buffer:
doc_id_bytes_buffer.close()
object_store.read_blob(index_build_params.vector_path, vector_bytes_buffer)
object_store.read_blob(index_build_params.doc_id_path, doc_id_bytes_buffer)

return VectorsDataset.parse(
vector_bytes_buffer,
doc_id_bytes_buffer,
index_build_params.dimension,
index_build_params.doc_count,
index_build_params.data_type,
)


def upload_index(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def test_parse_valid_data(vector_dtype):
assert np.array_equal(dataset.doc_ids, test_doc_ids)
assert np.array_equal(dataset.vectors, test_vectors)

dataset.free_vectors_space()
vectors_binary.close()
doc_ids_binary.close()

Expand All @@ -119,13 +120,14 @@ def test_parse_invalid_doc_count():
with pytest.raises(VectorsDatasetError):
vectors = BytesIO(np.zeros(6, dtype="<f4").tobytes())
doc_ids = BytesIO(np.array([1], dtype="<i4").tobytes())
VectorsDataset.parse(
dataset = VectorsDataset.parse(
vectors=vectors,
doc_ids=doc_ids,
dimension=2,
doc_count=2,
vector_dtype=DataType.FLOAT32,
)
dataset.free_vectors_space()
vectors.close()
doc_ids.close()

Expand All @@ -134,13 +136,14 @@ def test_parse_invalid_vector_dimensions():
with pytest.raises(VectorsDatasetError):
vectors = BytesIO(np.zeros(4, dtype="<f4").tobytes())
doc_ids = BytesIO(np.array([1, 2], dtype="<i4").tobytes())
VectorsDataset.parse(
dataset = VectorsDataset.parse(
vectors=vectors,
doc_ids=doc_ids,
dimension=3, # Expecting 6 values (2*3), but only provided 4
doc_count=2,
vector_dtype=DataType.FLOAT32,
)
dataset.free_vectors_space()
vectors.close()
doc_ids.close()

Expand All @@ -152,12 +155,13 @@ def test_parse_invalid_data():
with pytest.raises(VectorsDatasetError):
vectors = BytesIO(np.zeros(6, dtype="<f4").tobytes())
doc_ids = BytesIO(np.array([1, 2], dtype="<i4").tobytes())
VectorsDataset.parse(
dataset = VectorsDataset.parse(
vectors=vectors,
doc_ids=doc_ids,
dimension=3,
doc_count=2,
vector_dtype=DataType.FLOAT32,
)
dataset.free_vectors_space()
vectors.close()
doc_ids.close()
30 changes: 19 additions & 11 deletions test_remote_vector_index_builder/test_core/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ def test_successful_creation(
mock_object_store_factory.return_value = mock_object_store
mock_vectors_dataset_parse.return_value = Mock(spec=VectorsDataset)

vectors = BytesIO()
doc_ids = BytesIO()
# Execute
result = create_vectors_dataset(index_build_params, object_store_config)
result = create_vectors_dataset(
index_build_params, object_store_config, vectors, doc_ids
)

vectors.close()
doc_ids.close()

# Verify
mock_object_store_factory.assert_called_once_with(
Expand All @@ -77,10 +84,8 @@ def test_successful_creation(
assert mock_object_store.read_blob.call_count == 2
mock_vectors_dataset_parse.assert_called_once()
assert isinstance(result, VectorsDataset)
calls = mock_object_store.read_blob.call_args_list
for call_args in calls:
assert isinstance(call_args[0][1], BytesIO)
assert call_args[0][1].closed # Verify buffers are closed

result.free_vectors_space()


def test_download_blob_error_handling(
Expand All @@ -93,14 +98,17 @@ def test_download_blob_error_handling(
mock_object_store_factory.return_value = mock_object_store
mock_object_store.read_blob.side_effect = BlobError("Failed to read blob")

vectors = BytesIO()
doc_ids = BytesIO()

# Execute and verify
with pytest.raises(BlobError):
create_vectors_dataset(index_build_params, object_store_config)
# Verify that read_blob was called with BytesIO objects
calls = mock_object_store.read_blob.call_args_list
for call_args in calls:
assert isinstance(call_args[0][1], BytesIO)
assert call_args[0][1].closed # Verify buffers are closed
create_vectors_dataset(
index_build_params, object_store_config, vectors, doc_ids
)

vectors.close()
doc_ids.close()


def test_successful_upload(
Expand Down

0 comments on commit 4262ce7

Please sign in to comment.