Skip to content

Commit 5b21773

Browse files
committed
initial implementation of should_calculate_recall
Signed-off-by: Finn Roblin <finnrobl@amazon.com>
1 parent 2532d77 commit 5b21773

File tree

3 files changed

+46
-18
lines changed

3 files changed

+46
-18
lines changed

osbenchmark/worker_coordinator/runner.py

+41-18
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from functools import total_ordering
3838
from io import BytesIO
3939
from os.path import commonprefix
40+
from os import cpu_count as os_cpu_count
4041
from typing import List, Optional
4142

4243
import ijson
@@ -1320,29 +1321,47 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
13201321

13211322
return correct / min_num_of_results
13221323

1324+
def _set_initial_recall_values(params: dict, result: dict) -> None:
1325+
# Add recall@k and recall@1 to the initial result only if k is present in the params and calculate_recall is true
1326+
if "k" in params:
1327+
result.update({
1328+
"recall@k": 0,
1329+
"recall@1": 0
1330+
})
1331+
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
1332+
elif "max_distance" in params:
1333+
result.update({
1334+
"recall@max_distance": 0,
1335+
"recall@max_distance_1": 0
1336+
})
1337+
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
1338+
elif "min_score" in params:
1339+
result.update({
1340+
"recall@min_score": 0,
1341+
"recall@min_score_1": 0
1342+
})
1343+
1344+
def _get_should_calculate_recall(params: dict) -> bool:
1345+
num_clients = params.get("num_clients", 0)
1346+
if num_clients == 0:
1347+
self.logger.debug("Expected num_clients to be specified but was not.")
1348+
cpu_count = os_cpu_count()
1349+
if cpu_count < num_clients:
1350+
self.logger.warning("Number of clients, %s, specified is greater than the number of CPUs, %s, available."\
1351+
"This will lead to unperformant context switching on load generation host. Performance "\
1352+
"metrics may not be accurate. Skipping recall calculation.", num_clients, cpu_count)
1353+
return False
1354+
return params.get("calculate-recall", True)
1355+
13231356
result = {
13241357
"weight": 1,
13251358
"unit": "ops",
13261359
"success": True,
13271360
}
1328-
# Add recall@k and recall@1 to the initial result only if k is present in the params
1329-
if "k" in params:
1330-
result.update({
1331-
"recall@k": 0,
1332-
"recall@1": 0
1333-
})
1334-
# Add recall@max_distance and recall@max_distance_1 to the initial result only if max_distance is present in the params
1335-
elif "max_distance" in params:
1336-
result.update({
1337-
"recall@max_distance": 0,
1338-
"recall@max_distance_1": 0
1339-
})
1340-
# Add recall@min_score and recall@min_score_1 to the initial result only if min_score is present in the params
1341-
elif "min_score" in params:
1342-
result.update({
1343-
"recall@min_score": 0,
1344-
"recall@min_score_1": 0
1345-
})
1361+
# deal with clients here. Need to get num_clients
1362+
should_calculate_recall = _get_should_calculate_recall(params)
1363+
if should_calculate_recall:
1364+
_set_initial_recall_values(params, result)
13461365

13471366
doc_type = params.get("type")
13481367
response = await self._raw_search(opensearch, doc_type, index, body, request_params, headers=headers)
@@ -1366,6 +1385,10 @@ def calculate_radial_search_recall(predictions, neighbors, enable_top_1_recall=F
13661385
if _is_empty_search_results(response_json):
13671386
self.logger.info("Vector search query returned no results.")
13681387
return result
1388+
1389+
if not should_calculate_recall:
1390+
return result
1391+
13691392
id_field = parse_string_parameter("id-field-name", params, "_id")
13701393
candidates = []
13711394
for hit in response_json['hits']['hits']:

osbenchmark/worker_coordinator/worker_coordinator.py

+3
Original file line numberDiff line numberDiff line change
@@ -1624,6 +1624,9 @@ async def __call__(self, *args, **kwargs):
16241624
processing_start = time.perf_counter()
16251625
self.schedule_handle.before_request(processing_start)
16261626
async with self.opensearch["default"].new_request_context() as request_context:
1627+
# add num_clients to the parameter so that vector search runner can skip calculating recall
1628+
# if num_clients > cpu_count().
1629+
params.update({"num_clients": self.task.clients})
16271630
total_ops, total_ops_unit, request_meta_data = await execute_single(runner, self.opensearch, params, self.on_error)
16281631
request_start = request_context.request_start
16291632
request_end = request_context.request_end

osbenchmark/workload/params.py

+2
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,7 @@ def __init__(self, workload, params, **kwargs):
549549
f"'type' not supported with 'data-stream' for operation '{kwargs.get('operation_name')}'")
550550
request_cache = params.get("cache", None)
551551
detailed_results = params.get("detailed-results", False)
552+
calculate_recall = params.get("calculate-recall", True)
552553
query_body = params.get("body", None)
553554
pages = params.get("pages", None)
554555
results_per_page = params.get("results-per-page", None)
@@ -561,6 +562,7 @@ def __init__(self, workload, params, **kwargs):
561562
"type": type_name,
562563
"cache": request_cache,
563564
"detailed-results": detailed_results,
565+
"calculate-recall": calculate_recall,
564566
"request-params": request_params,
565567
"response-compression-enabled": response_compression_enabled,
566568
"body": query_body

0 commit comments

Comments
 (0)