33
33
import time
34
34
from abc import ABC , abstractmethod
35
35
from enum import Enum
36
- from typing import List , Dict , Any
36
+ from typing import List , Dict , Any , Optional , Tuple
37
37
38
38
import numpy as np
39
39
@@ -880,10 +880,12 @@ class VectorDataSetPartitionParamSource(ParamSource):
880
880
offset: Offset into the data set to start at. Relevant when there are
881
881
multiple partitions
882
882
"""
883
+ NESTED_FIELD_SEPARATOR = "."
883
884
884
885
def __init__ (self , workload , params , context : Context , ** kwargs ):
885
886
super ().__init__ (workload , params , ** kwargs )
886
887
self .field_name : str = parse_string_parameter ("field" , params )
888
+ self .is_nested = self .NESTED_FIELD_SEPARATOR in self .field_name # in base class because used for both bulk ingest and queries.
887
889
self .context = context
888
890
self .data_set_format = parse_string_parameter ("data_set_format" , params )
889
891
self .data_set_path = parse_string_parameter ("data_set_path" , params , "" )
@@ -979,6 +981,18 @@ def partition(self, partition_index, total_partitions):
979
981
partition_x .current = partition_x .offset
980
982
return partition_x
981
983
984
+ def get_split_fields (self ) -> Tuple [str , str ]:
985
+ fields_as_array = self .field_name .split (self .NESTED_FIELD_SEPARATOR )
986
+
987
+ # TODO: Add support to multiple levels of nesting if a future benchmark requires it.
988
+
989
+ if len (fields_as_array ) != 2 :
990
+ raise ValueError (
991
+ f"Field name { self .field_name } is not a nested field name. Currently we support only 1 level of nesting."
992
+ )
993
+ return fields_as_array [0 ], fields_as_array [1 ]
994
+
995
+
982
996
@abstractmethod
983
997
def params (self ):
984
998
"""
@@ -1219,12 +1233,24 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict
1219
1233
query .update ({
1220
1234
"filter" : efficient_filter ,
1221
1235
})
1222
- return {
1236
+
1237
+ knn_search_query = {
1223
1238
"knn" : {
1224
1239
self .field_name : query ,
1225
1240
},
1226
1241
}
1227
1242
1243
+ if self .is_nested :
1244
+ outer_field_name , _ = self .get_split_fields ()
1245
+ return {
1246
+ "nested" : {
1247
+ "path" : outer_field_name ,
1248
+ "query" : knn_search_query
1249
+ }
1250
+ }
1251
+
1252
+ return knn_search_query
1253
+
1228
1254
1229
1255
class BulkVectorsFromDataSetParamSource (VectorDataSetPartitionParamSource ):
1230
1256
""" Create bulk index requests from a data set of vectors.
@@ -1241,13 +1267,74 @@ class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource):
1241
1267
def __init__ (self , workload , params , ** kwargs ):
1242
1268
super ().__init__ (workload , params , Context .INDEX , ** kwargs )
1243
1269
self .bulk_size : int = parse_int_parameter ("bulk_size" , params )
1244
- self .retries : int = parse_int_parameter ("retries" , params ,
1245
- self .DEFAULT_RETRIES )
1270
+ self .retries : int = parse_int_parameter ("retries" , params , self .DEFAULT_RETRIES )
1246
1271
self .index_name : str = parse_string_parameter ("index" , params )
1247
1272
self .id_field_name : str = parse_string_parameter (
1248
- self .PARAMS_NAME_ID_FIELD_NAME , params , self .DEFAULT_ID_FIELD_NAME )
1273
+ self .PARAMS_NAME_ID_FIELD_NAME , params , self .DEFAULT_ID_FIELD_NAME
1274
+ )
1249
1275
1250
- def bulk_transform (self , partition : np .ndarray , action ) -> List [Dict [str , Any ]]:
1276
+ self .action_buffer = None
1277
+ self .num_nested_vectors = 10
1278
+
1279
+ self .parent_data_set_path = parse_string_parameter (
1280
+ "parents_data_set_path" , params , self .data_set_path
1281
+ )
1282
+
1283
+ self .parent_data_set_format = self .data_set_format
1284
+
1285
+ self .parent_data_set_corpus = self .data_set_corpus
1286
+
1287
+ self .logger = logging .getLogger (__name__ )
1288
+
1289
+ def partition (self , partition_index , total_partitions ):
1290
+ partition = super ().partition (partition_index , total_partitions )
1291
+ if self .parent_data_set_corpus and not self .parent_data_set_path :
1292
+ parent_data_set_path = self ._get_corpora_file_paths (
1293
+ self .parent_data_set_corpus , self .parent_data_set_format
1294
+ )
1295
+ self ._validate_data_set_corpus (parent_data_set_path )
1296
+ self .parent_data_set_path = parent_data_set_path [0 ]
1297
+ if not self .parent_data_set_path :
1298
+ self .parent_data_set_path = self .data_set_path
1299
+ # add neighbor instance to partition
1300
+ if self .is_nested :
1301
+ partition .parent_data_set = get_data_set (
1302
+ self .parent_data_set_format , self .parent_data_set_path , Context .PARENTS
1303
+ )
1304
+ partition .parent_data_set .seek (partition .offset )
1305
+
1306
+ return partition
1307
+
1308
+ def bulk_transform_non_nested (self , partition : np .ndarray , action ) -> List [Dict [str , Any ]]:
1309
+ """
1310
+ Create bulk ingest actions for data with a non-nested field.
1311
+ """
1312
+ actions = []
1313
+
1314
+ _ = [
1315
+ actions .extend ([action (self .id_field_name , i + self .current ), None ])
1316
+ for i in range (len (partition ))
1317
+ ]
1318
+ bulk_contents = []
1319
+
1320
+ add_id_field_to_body = self .id_field_name != self .DEFAULT_ID_FIELD_NAME
1321
+ for vec , identifier in zip (
1322
+ partition .tolist (), range (self .current , self .current + len (partition ))
1323
+ ):
1324
+ row = {self .field_name : vec }
1325
+ if add_id_field_to_body :
1326
+ row .update ({self .id_field_name : identifier })
1327
+ bulk_contents .append (row )
1328
+
1329
+ actions [1 ::2 ] = bulk_contents
1330
+
1331
+ self .logger .info ("Actions: %s" , actions )
1332
+ return actions
1333
+
1334
+
1335
+ def bulk_transform (
1336
+ self , partition : np .ndarray , action , parents_ids : Optional [np .ndarray ]
1337
+ ) -> List [Dict [str , Any ]]:
1251
1338
"""Partitions and transforms a list of vectors into OpenSearch's bulk
1252
1339
injection format.
1253
1340
Args:
@@ -1257,19 +1344,63 @@ def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
1257
1344
Returns:
1258
1345
An array of transformed vectors in bulk format.
1259
1346
"""
1347
+
1348
+ if not self .is_nested :
1349
+ return self .bulk_transform_non_nested (partition , action )
1350
+
1260
1351
actions = []
1261
- _ = [
1262
- actions .extend ([action (self .id_field_name , i + self .current ), None ])
1263
- for i in range (len (partition ))
1264
- ]
1265
- bulk_contents = []
1352
+
1353
+ outer_field_name , inner_field_name = self .get_split_fields ()
1354
+
1266
1355
add_id_field_to_body = self .id_field_name != self .DEFAULT_ID_FIELD_NAME
1267
- for vec , identifier in zip (partition .tolist (), range (self .current , self .current + len (partition ))):
1268
- row = {self .field_name : vec }
1356
+
1357
+ if self .action_buffer is None :
1358
+ first_index_of_parent_ids = 0
1359
+ self .action_buffer = {outer_field_name : []}
1360
+ self .action_parent_id = parents_ids [first_index_of_parent_ids ]
1269
1361
if add_id_field_to_body :
1270
- row .update ({self .id_field_name : identifier })
1271
- bulk_contents .append (row )
1272
- actions [1 ::2 ] = bulk_contents
1362
+ self .action_buffer .update ({self .id_field_name : self .action_parent_id })
1363
+
1364
+ part_list = partition .tolist ()
1365
+ for i in range (len (partition )):
1366
+
1367
+ nested = {inner_field_name : part_list [i ]}
1368
+
1369
+ current_parent_id = parents_ids [i ]
1370
+
1371
+ if self .action_parent_id == current_parent_id :
1372
+ self .action_buffer [outer_field_name ].append (nested )
1373
+ else :
1374
+ # flush action buffer
1375
+ actions .extend (
1376
+ [
1377
+ action (self .id_field_name , self .action_parent_id ),
1378
+ self .action_buffer ,
1379
+ ]
1380
+ )
1381
+
1382
+ self .current += len (self .action_buffer [outer_field_name ])
1383
+
1384
+ self .action_buffer = {outer_field_name : []}
1385
+ if add_id_field_to_body :
1386
+
1387
+ self .action_buffer .update ({self .id_field_name : current_parent_id })
1388
+
1389
+ self .action_buffer [outer_field_name ].append (nested )
1390
+
1391
+ self .action_parent_id = current_parent_id
1392
+
1393
+ max_position = self .offset + self .num_vectors
1394
+ if (
1395
+ self .current + len (self .action_buffer [outer_field_name ]) + self .bulk_size
1396
+ >= max_position
1397
+ ):
1398
+ # final flush of remaining vectors in the last partition (for the last client)
1399
+ self .current += len (self .action_buffer [outer_field_name ])
1400
+ actions .extend (
1401
+ [action (self .id_field_name , self .action_parent_id ), self .action_buffer ]
1402
+ )
1403
+
1273
1404
return actions
1274
1405
1275
1406
def params (self ):
@@ -1281,29 +1412,34 @@ def params(self):
1281
1412
1282
1413
def action (id_field_name , doc_id ):
1283
1414
# support only index operation
1284
- bulk_action = 'index'
1285
- metadata = {
1286
- '_index' : self .index_name
1287
- }
1415
+ bulk_action = "index"
1416
+ metadata = {"_index" : self .index_name }
1288
1417
# Add id field to metadata only if it is _id
1289
1418
if id_field_name == self .DEFAULT_ID_FIELD_NAME :
1290
1419
metadata .update ({id_field_name : doc_id })
1291
1420
return {bulk_action : metadata }
1292
1421
1293
1422
remaining_vectors_in_partition = self .num_vectors + self .offset - self .current
1294
- # update bulk size if number of vectors to read is less than actual bulk size
1423
+
1295
1424
bulk_size = min (self .bulk_size , remaining_vectors_in_partition )
1425
+
1296
1426
partition = self .data_set .read (bulk_size )
1297
- body = self .bulk_transform (partition , action )
1427
+
1428
+ if self .is_nested :
1429
+ parent_ids = self .parent_data_set .read (bulk_size )
1430
+ else :
1431
+ parent_ids = None
1432
+
1433
+ body = self .bulk_transform (partition , action , parent_ids )
1298
1434
size = len (body ) // 2
1299
- self .current += size
1435
+
1436
+ if not self .is_nested :
1437
+ # in the nested case, we may have irregular number of vectors ingested,
1438
+ # so we calculate self.current within bulk_transform method when self.is_nested.
1439
+ self .current += size
1300
1440
self .percent_completed = self .current / self .total
1301
1441
1302
- return {
1303
- "body" : body ,
1304
- "retries" : self .retries ,
1305
- "size" : size
1306
- }
1442
+ return {"body" : body , "retries" : self .retries , "size" : size }
1307
1443
1308
1444
1309
1445
def get_target (workload , params ):
0 commit comments