Skip to content

Commit 822f31d

Browse files
committed
Added support for a data corpora source-url field to specify a file directly.
Signed-off-by: Govind Kamat <govkamat@amazon.com>
1 parent 38cf15e commit 822f31d

File tree

4 files changed

+127
-38
lines changed

4 files changed

+127
-38
lines changed

it/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import random
3030
import socket
3131
import time
32+
import datetime
3233

3334
import pytest
3435

@@ -87,7 +88,7 @@ def osbenchmark(cfg, command_line):
8788
These commands may have different CLI options than test_execution.
8889
"""
8990
cmd = osbenchmark_command_line_for(cfg, command_line)
90-
print("\nInvoking OSB:", cmd)
91+
print(f'\n{datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")} Invoking OSB: {cmd}')
9192
err, retcode = process.run_subprocess_with_stderr(cmd)
9293
if retcode != 0:
9394
print(err)

osbenchmark/workload/loader.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -464,20 +464,23 @@ def __init__(self, offline, test_mode):
464464
self.test_mode = test_mode
465465
self.logger = logging.getLogger(__name__)
466466

467-
def download(self, base_url, target_path, size_in_bytes):
467+
def download(self, base_url, source_url, target_path, size_in_bytes):
468468
file_name = os.path.basename(target_path)
469469

470470
if not base_url:
471471
raise exceptions.DataError("Cannot download data because no base URL is provided.")
472472
if self.offline:
473473
raise exceptions.SystemSetupError(f"Cannot find [{target_path}]. Please disable offline mode and retry.")
474474

475-
if base_url.endswith("/"):
476-
separator = ""
475+
if source_url:
476+
data_url = source_url
477477
else:
478-
separator = "/"
479-
# join manually as `urllib.parse.urljoin` does not work with S3 or GS URL schemes.
480-
data_url = f"{base_url}{separator}{file_name}"
478+
if base_url.endswith("/"):
479+
separator = ""
480+
else:
481+
separator = "/"
482+
# join manually as `urllib.parse.urljoin` does not work with S3 or GS URL schemes.
483+
data_url = f"{base_url}{separator}{file_name}"
481484
try:
482485
io.ensure_dir(os.path.dirname(target_path))
483486
if size_in_bytes:
@@ -573,7 +576,7 @@ def prepare_document_set(self, document_set, data_root):
573576
raise exceptions.BenchmarkAssertionError(f"Workload {self.workload_name} specifies documents but no corpus")
574577

575578
try:
576-
self.downloader.download(document_set.base_url, target_path, expected_size)
579+
self.downloader.download(document_set.base_url, document_set.source_url, target_path, expected_size)
577580
except exceptions.DataError as e:
578581
if e.message == "Cannot download data because no base URL is provided." and \
579582
self.is_locally_available(target_path):
@@ -1489,6 +1492,7 @@ def _create_corpora(self, corpora_specs, indices, data_streams):
14891492
source_format = self._r(doc_spec, "source-format", mandatory=False, default_value=default_source_format)
14901493

14911494
if source_format in workload.Documents.SUPPORTED_SOURCE_FORMAT:
1495+
source_url = self._r(doc_spec, "source-url", mandatory=False)
14921496
docs = self._r(doc_spec, "source-file")
14931497
if io.is_archive(docs):
14941498
document_archive = docs
@@ -1541,6 +1545,7 @@ def _create_corpora(self, corpora_specs, indices, data_streams):
15411545
document_file=document_file,
15421546
document_archive=document_archive,
15431547
base_url=base_url,
1548+
source_url=source_url,
15441549
includes_action_and_meta_data=includes_action_and_meta_data,
15451550
number_of_documents=num_docs,
15461551
compressed_size_in_bytes=compressed_bytes,

osbenchmark/workload/workload.py

+13-11
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ class Documents:
190190
SOURCE_FORMAT_BIG_ANN = "big-ann"
191191
SUPPORTED_SOURCE_FORMAT = [SOURCE_FORMAT_BULK, SOURCE_FORMAT_HDF5, SOURCE_FORMAT_BIG_ANN]
192192

193-
def __init__(self, source_format, document_file=None, document_archive=None, base_url=None,
193+
def __init__(self, source_format, document_file=None, document_archive=None, base_url=None, source_url=None,
194194
includes_action_and_meta_data=False,
195195
number_of_documents=0, compressed_size_in_bytes=0, uncompressed_size_in_bytes=0, target_index=None,
196196
target_data_stream=None, target_type=None, meta_data=None):
@@ -201,7 +201,8 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas
201201
just need a mapping but no documents)
202202
:param document_archive: The file name of the compressed benchmark document name on the remote server. Optional (e.g. for
203203
percolation we just need a mapping but no documents)
204-
:param base_url: The URL from which to load data if they are not available locally. Optional.
204+
:param base_url: The URL from which to load data if they are not available locally. Excludes the file or object name. Optional.
205+
:param source_url: The full URL to the file or object from which to load data if not available locally. Optional.
205206
:param includes_action_and_meta_data: True, if the source file already includes the action and meta-data line. False, if it only
206207
contains documents.
207208
:param number_of_documents: The number of documents
@@ -224,6 +225,7 @@ def __init__(self, source_format, document_file=None, document_archive=None, bas
224225
self.document_file = document_file
225226
self.document_archive = document_archive
226227
self.base_url = base_url
228+
self.source_url = source_url
227229
self.includes_action_and_meta_data = includes_action_and_meta_data
228230
self._number_of_documents = number_of_documents
229231
self._compressed_size_in_bytes = compressed_size_in_bytes
@@ -295,18 +297,18 @@ def __repr__(self):
295297

296298
def __hash__(self):
297299
return hash(self.source_format) ^ hash(self.document_file) ^ hash(self.document_archive) ^ hash(self.base_url) ^ \
298-
hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ hash(self.compressed_size_in_bytes) ^ \
299-
hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ hash(self.target_data_stream) ^ hash(self.target_type) ^ \
300-
hash(frozenset(self.meta_data.items()))
300+
hash(self.source_url) ^ hash(self.includes_action_and_meta_data) ^ hash(self.number_of_documents) ^ \
301+
hash(self.compressed_size_in_bytes) ^ hash(self.uncompressed_size_in_bytes) ^ hash(self.target_index) ^ \
302+
hash(self.target_data_stream) ^ hash(self.target_type) ^ hash(frozenset(self.meta_data.items()))
301303

302304
def __eq__(self, othr):
303305
return (isinstance(othr, type(self)) and
304-
(self.source_format, self.document_file, self.document_archive, self.base_url, self.includes_action_and_meta_data,
305-
self.number_of_documents, self.compressed_size_in_bytes, self.uncompressed_size_in_bytes,
306-
self.target_type, self.target_data_stream, self.target_type, self.meta_data) ==
307-
(othr.source_format, othr.document_file, othr.document_archive, othr.base_url, othr.includes_action_and_meta_data,
308-
othr.number_of_documents, othr.compressed_size_in_bytes, othr.uncompressed_size_in_bytes,
309-
othr.target_type, othr.target_data_stream, othr.target_type, othr.meta_data))
306+
(self.source_format, self.document_file, self.document_archive, self.base_url, self.source_url,
307+
self.includes_action_and_meta_data, self.number_of_documents, self.compressed_size_in_bytes,
308+
self.uncompressed_size_in_bytes, self.target_type, self.target_data_stream, self.target_type, self.meta_data) ==
309+
(othr.source_format, othr.document_file, othr.document_archive, othr.base_url, self.source_url,
310+
othr.includes_action_and_meta_data, othr.number_of_documents, othr.compressed_size_in_bytes,
311+
othr.uncompressed_size_in_bytes, othr.target_type, othr.target_data_stream, othr.target_type, othr.meta_data))
310312

311313

312314
class DocumentCorpus:

0 commit comments

Comments
 (0)