diff --git a/cpp/examples/basic_io.cpp b/cpp/examples/basic_io.cpp index 39bfc315cd..9fed0cee6a 100644 --- a/cpp/examples/basic_io.cpp +++ b/cpp/examples/basic_io.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,7 +124,7 @@ int main() check(a[i] == b[i]); } } - kvikio::defaults::thread_pool_nthreads_reset(16); + kvikio::defaults::set_thread_pool_nthreads(16); { std::cout << std::endl; Timer timer; diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 4334549d23..adb3a3b3f7 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -99,14 +99,14 @@ class defaults { [[nodiscard]] static CompatMode compat_mode(); /** - * @brief Reset the value of `kvikio::defaults::compat_mode()`. + * @brief Set the value of `kvikio::defaults::compat_mode()`. * * Changing the compatibility mode affects all the new FileHandles whose `compat_mode` argument is * not explicitly set, but it never affects existing FileHandles. * * @param compat_mode Compatibility mode. */ - static void compat_mode_reset(CompatMode compat_mode); + static void set_compat_mode(CompatMode compat_mode); /** * @brief Infer the `AUTO` compatibility mode from the system runtime. @@ -157,7 +157,7 @@ class defaults { * * Notice, it is not possible to change the default thread pool. KvikIO will * always use the same thread pool however it is possible to change number of - * threads in the pool (see `kvikio::default::thread_pool_nthreads_reset()`). + * threads in the pool (see `kvikio::default::set_thread_pool_nthreads()`). * * @return The default thread pool instance. */ @@ -166,7 +166,7 @@ class defaults { /** * @brief Get the number of threads in the default thread pool. * - * Set the default value using `kvikio::default::thread_pool_nthreads_reset()` or by + * Set the default value using `kvikio::default::set_thread_pool_nthreads()` or by * setting the `KVIKIO_NTHREADS` environment variable. If not set, the default value is 1. * * @return The number of threads. @@ -174,20 +174,19 @@ class defaults { [[nodiscard]] static unsigned int thread_pool_nthreads(); /** - * @brief Reset the number of threads in the default thread pool. Waits for all currently running + * @brief Set the number of threads in the default thread pool. Waits for all currently running * tasks to be completed, then destroys all threads in the pool and creates a new thread pool with * the new number of threads. Any tasks that were waiting in the queue before the pool was reset - * will then be executed by the new threads. If the pool was paused before resetting it, the new - * pool will be paused as well. + * will then be executed by the new threads. * * @param nthreads The number of threads to use. */ - static void thread_pool_nthreads_reset(unsigned int nthreads); + static void set_thread_pool_nthreads(unsigned int nthreads); /** * @brief Get the default task size used for parallel IO operations. * - * Set the default value using `kvikio::default::task_size_reset()` or by setting + * Set the default value using `kvikio::default::set_task_size()` or by setting * the `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 4 MiB. * * @return The default task size in bytes. @@ -195,11 +194,11 @@ class defaults { [[nodiscard]] static std::size_t task_size(); /** - * @brief Reset the default task size used for parallel IO operations. + * @brief Set the default task size used for parallel IO operations. * * @param nbytes The default task size in bytes. */ - static void task_size_reset(std::size_t nbytes); + static void set_task_size(std::size_t nbytes); /** * @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes). @@ -207,7 +206,7 @@ class defaults { * In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut * that circumvent the threadpool and use the POSIX backend directly. * - * Set the default value using `kvikio::default::gds_threshold_reset()` or by setting the + * Set the default value using `kvikio::default::set_gds_threshold()` or by setting the * `KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default value is 1 MiB. * * @return The default GDS threshold size in bytes. @@ -215,15 +214,15 @@ class defaults { [[nodiscard]] static std::size_t gds_threshold(); /** - * @brief Reset the default GDS threshold, which is the minimum size to use GDS (in bytes). + * @brief Set the default GDS threshold, which is the minimum size to use GDS (in bytes). * @param nbytes The default GDS threshold size in bytes. */ - static void gds_threshold_reset(std::size_t nbytes); + static void set_gds_threshold(std::size_t nbytes); /** * @brief Get the size of the bounce buffer used to stage data in host memory. * - * Set the value using `kvikio::default::bounce_buffer_size_reset()` or by setting the + * Set the value using `kvikio::default::set_bounce_buffer_size()` or by setting the * `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the value is 16 MiB. * * @return The bounce buffer size in bytes. @@ -231,16 +230,16 @@ class defaults { [[nodiscard]] static std::size_t bounce_buffer_size(); /** - * @brief Reset the size of the bounce buffer used to stage data in host memory. + * @brief Set the size of the bounce buffer used to stage data in host memory. * * @param nbytes The bounce buffer size in bytes. */ - static void bounce_buffer_size_reset(std::size_t nbytes); + static void set_bounce_buffer_size(std::size_t nbytes); /** * @brief Get the maximum number of attempts per remote IO read. * - * Set the value using `kvikio::default::http_max_attempts_reset()` or by setting + * Set the value using `kvikio::default::set_http_max_attempts()` or by setting * the `KVIKIO_HTTP_MAX_ATTEMPTS` environment variable. If not set, the value is 3. * * @return The maximum number of remote IO reads to attempt before raising an @@ -249,16 +248,16 @@ class defaults { [[nodiscard]] static std::size_t http_max_attempts(); /** - * @brief Reset the maximum number of attempts per remote IO read. + * @brief Set the maximum number of attempts per remote IO read. * * @param attempts The maximum number of attempts to try before raising an error. */ - static void http_max_attempts_reset(std::size_t attempts); + static void set_http_max_attempts(std::size_t attempts); /** * @brief The list of HTTP status codes to retry. * - * Set the value using `kvikio::default::http_status_codes()` or by setting the + * Set the value using `kvikio::default::set_http_status_codes()` or by setting the * `KVIKIO_HTTP_STATUS_CODES` environment variable. If not set, the default value is * * - 429 @@ -272,11 +271,11 @@ class defaults { [[nodiscard]] static std::vector const& http_status_codes(); /** - * @brief Reset the list of HTTP status codes to retry. + * @brief Set the list of HTTP status codes to retry. * * @param status_codes The HTTP status codes to retry. */ - static void http_status_codes_reset(std::vector status_codes); + static void set_http_status_codes(std::vector status_codes); }; } // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index e0a908cf4d..bb0849d337 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -143,7 +143,7 @@ defaults* defaults::instance() } CompatMode defaults::compat_mode() { return instance()->_compat_mode; } -void defaults::compat_mode_reset(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; } +void defaults::set_compat_mode(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; } CompatMode defaults::infer_compat_mode_if_auto(CompatMode compat_mode) noexcept { @@ -169,7 +169,7 @@ BS_thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } unsigned int defaults::thread_pool_nthreads() { return thread_pool().get_thread_count(); } -void defaults::thread_pool_nthreads_reset(unsigned int nthreads) +void defaults::set_thread_pool_nthreads(unsigned int nthreads) { if (nthreads == 0) { throw std::invalid_argument("number of threads must be a positive integer greater than zero"); @@ -179,7 +179,7 @@ void defaults::thread_pool_nthreads_reset(unsigned int nthreads) std::size_t defaults::task_size() { return instance()->_task_size; } -void defaults::task_size_reset(std::size_t nbytes) +void defaults::set_task_size(std::size_t nbytes) { if (nbytes == 0) { throw std::invalid_argument("task size must be a positive integer greater than zero"); @@ -189,11 +189,11 @@ void defaults::task_size_reset(std::size_t nbytes) std::size_t defaults::gds_threshold() { return instance()->_gds_threshold; } -void defaults::gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } +void defaults::set_gds_threshold(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } std::size_t defaults::bounce_buffer_size() { return instance()->_bounce_buffer_size; } -void defaults::bounce_buffer_size_reset(std::size_t nbytes) +void defaults::set_bounce_buffer_size(std::size_t nbytes) { if (nbytes == 0) { throw std::invalid_argument( @@ -204,7 +204,7 @@ void defaults::bounce_buffer_size_reset(std::size_t nbytes) std::size_t defaults::http_max_attempts() { return instance()->_http_max_attempts; } -void defaults::http_max_attempts_reset(std::size_t attempts) +void defaults::set_http_max_attempts(std::size_t attempts) { if (attempts == 0) { throw std::invalid_argument("attempts must be a positive integer"); } instance()->_http_max_attempts = attempts; @@ -212,7 +212,7 @@ void defaults::http_max_attempts_reset(std::size_t attempts) std::vector const& defaults::http_status_codes() { return instance()->_http_status_codes; } -void defaults::http_status_codes_reset(std::vector status_codes) +void defaults::set_http_status_codes(std::vector status_codes) { instance()->_http_status_codes = std::move(status_codes); } diff --git a/docs/source/api.rst b/docs/source/api.rst index fd34367a00..5cba4fd8d3 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -11,6 +11,24 @@ CuFile .. autoclass:: IOFuture :members: +CuFile driver +------------- +.. currentmodule:: kvikio.cufile_driver + +.. autoclass:: ConfigContextManager + +.. autofunction:: set + +.. autofunction:: get + +.. autofunction:: libcufile_version + +.. autofunction:: driver_open + +.. autofunction:: driver_close + +.. autofunction:: initialize + Zarr ---- .. currentmodule:: kvikio.zarr @@ -29,10 +47,43 @@ Defaults -------- .. currentmodule:: kvikio.defaults +.. autoclass:: ConfigContextManager + +.. autofunction:: set + +.. autofunction:: get + .. autofunction:: compat_mode -.. autofunction:: compat_mode_reset +.. autofunction:: num_threads -.. autofunction:: get_num_threads +.. autofunction:: task_size + +.. autofunction:: gds_threshold + +.. autofunction:: bounce_buffer_size + +.. autofunction:: http_status_codes + +.. autofunction:: http_max_attempts + +.. autofunction:: compat_mode_reset +.. autofunction:: set_compat_mode .. autofunction:: num_threads_reset +.. autofunction:: set_num_threads + +.. autofunction:: task_size_reset +.. autofunction:: set_task_size + +.. autofunction:: gds_threshold_reset +.. autofunction:: set_gds_threshold + +.. autofunction:: bounce_buffer_size_reset +.. autofunction:: set_bounce_buffer_size + +.. autofunction:: http_status_codes_reset +.. autofunction:: set_http_status_codes + +.. autofunction:: http_max_attempts_reset +.. autofunction:: set_http_max_attempts diff --git a/docs/source/runtime_settings.rst b/docs/source/runtime_settings.rst index 5847c1ffbe..b488c10146 100644 --- a/docs/source/runtime_settings.rst +++ b/docs/source/runtime_settings.rst @@ -17,37 +17,37 @@ Under ``AUTO``, KvikIO falls back to the compatibility mode: * when running in Windows Subsystem for Linux (WSL). * when ``/run/udev`` isn't readable, which typically happens when running inside a docker image not launched with ``--volume /run/udev:/run/udev:ro``. -This setting can also be programmatically controlled by :py:func:`kvikio.defaults.set_compat_mode` and :py:func:`kvikio.defaults.compat_mode_reset`. +This setting can also be programmatically accessed using :py:func:`kvikio.defaults.compat_mode` (getter) and :py:func:`kvikio.defaults.set` (setter). Thread Pool ``KVIKIO_NTHREADS`` ------------------------------- KvikIO can use multiple threads for IO automatically. Set the environment variable ``KVIKIO_NTHREADS`` to the number of threads in the thread pool. If not set, the default value is 1. -This setting can also be controlled by :py:func:`kvikio.defaults.get_num_threads`, :py:func:`kvikio.defaults.num_threads_reset`, and :py:func:`kvikio.defaults.set_num_threads`. +This setting can also be accessed using :py:func:`kvikio.defaults.num_threads` (getter) and :py:func:`kvikio.defaults.set` (setter). Task Size ``KVIKIO_TASK_SIZE`` ------------------------------ KvikIO splits parallel IO operations into multiple tasks. Set the environment variable ``KVIKIO_TASK_SIZE`` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB). -This setting can also be controlled by :py:func:`kvikio.defaults.task_size`, :py:func:`kvikio.defaults.task_size_reset`, and :py:func:`kvikio.defaults.set_task_size`. +This setting can also be accessed using :py:func:`kvikio.defaults.task_size` (getter) and :py:func:`kvikio.defaults.set` (setter). GDS Threshold ``KVIKIO_GDS_THRESHOLD`` -------------------------------------- In order to improve performance of small IO, ``.pread()`` and ``.pwrite()`` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable ``KVIKIO_GDS_THRESHOLD`` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB). -This setting can also be controlled by :py:func:`kvikio.defaults.gds_threshold`, :py:func:`kvikio.defaults.gds_threshold_reset`, and :py:func:`kvikio.defaults.set_gds_threshold`. +This setting can also be accessed using :py:func:`kvikio.defaults.gds_threshold` (getter) and :py:func:`kvikio.defaults.set` (setter). Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE`` ------------------------------------------------------- KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB). -This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`. +This setting can also be accessed using :py:func:`kvikio.defaults.bounce_buffer_size` (getter) and :py:func:`kvikio.defaults.set` (setter). -#### HTTP Retries ------------------ +HTTP Retries ``KVIKIO_HTTP_STATUS_CODES``, ``KVIKIO_HTTP_MAX_ATTEMPTS`` +------------------------------------------------------------------------ -The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables. +The behavior when a remote I/O read returns an error can be controlled through the ``KVIKIO_HTTP_STATUS_CODES`` and ``KVIKIO_HTTP_MAX_ATTEMPTS`` environment variables. -`KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry and can be controlled by :py:func:`kvikio.defaults.http_status_codes`, :py:func:`kvikio.defaults.http_status_codes_reset`, and :py:func:`kvikio.defaults.set_http_status_codes`. +KvikIO will retry a request should any of the HTTP status code in ``KVIKIO_HTTP_STATUS_CODES`` is received. The default values are ``429, 500, 502, 503, 504``. This setting can also be accessed using :py:func:`kvikio.defaults.http_status_codes` (getter) and :py:func:`kvikio.defaults.set` (setter). -`KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception and can be controlled by :py:func:`kvikio.defaults.http_max_attempts`, :py:func:`kvikio.defaults.http_max_attempts_reset`, and :py:func:`kvikio.defaults.set_http_max_attempts`. +The maximum number of attempts to make before throwing an exception is controlled by ``KVIKIO_HTTP_MAX_ATTEMPTS``. The default value is 3. This setting can also be accessed using :py:func:`kvikio.defaults.http_max_attempts` (getter) and :py:func:`kvikio.defaults.set` (setter). diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 0770cb557a..bbd8902d4b 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -14,80 +14,86 @@ cdef extern from "" namespace "kvikio" nogil: OFF = 0 ON = 1 AUTO = 2 + bool cpp_is_compat_mode_preferred \ + "kvikio::defaults::is_compat_mode_preferred"() except + CompatMode cpp_compat_mode "kvikio::defaults::compat_mode"() except + - void cpp_compat_mode_reset \ - "kvikio::defaults::compat_mode_reset"(CompatMode compat_mode) except + + void cpp_set_compat_mode \ + "kvikio::defaults::set_compat_mode"(CompatMode compat_mode) except + unsigned int cpp_thread_pool_nthreads \ "kvikio::defaults::thread_pool_nthreads"() except + - void cpp_thread_pool_nthreads_reset \ - "kvikio::defaults::thread_pool_nthreads_reset" (unsigned int nthreads) except + + void cpp_set_thread_pool_nthreads \ + "kvikio::defaults::set_thread_pool_nthreads" (unsigned int nthreads) except + size_t cpp_task_size "kvikio::defaults::task_size"() except + - void cpp_task_size_reset "kvikio::defaults::task_size_reset"(size_t nbytes) except + + void cpp_set_task_size "kvikio::defaults::set_task_size"(size_t nbytes) except + size_t cpp_gds_threshold "kvikio::defaults::gds_threshold"() except + - void cpp_gds_threshold_reset \ - "kvikio::defaults::gds_threshold_reset"(size_t nbytes) except + + void cpp_set_gds_threshold \ + "kvikio::defaults::set_gds_threshold"(size_t nbytes) except + size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except + - void cpp_bounce_buffer_size_reset \ - "kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except + + void cpp_set_bounce_buffer_size \ + "kvikio::defaults::set_bounce_buffer_size"(size_t nbytes) except + size_t cpp_http_max_attempts "kvikio::defaults::http_max_attempts"() except + - void cpp_http_max_attempts_reset \ - "kvikio::defaults::http_max_attempts_reset"(size_t attempts) except + + void cpp_set_http_max_attempts \ + "kvikio::defaults::set_http_max_attempts"(size_t attempts) except + vector[int] cpp_http_status_codes "kvikio::defaults::http_status_codes"() except + - void cpp_http_status_codes_reset \ - "kvikio::defaults::http_status_codes_reset"(vector[int] status_codes) except + + void cpp_set_http_status_codes \ + "kvikio::defaults::set_http_status_codes"(vector[int] status_codes) except + + + +def is_compat_mode_preferred() -> bool: + return cpp_is_compat_mode_preferred() def compat_mode() -> CompatMode: return cpp_compat_mode() -def compat_mode_reset(compat_mode: CompatMode) -> None: - cpp_compat_mode_reset(compat_mode) +def set_compat_mode(compat_mode: CompatMode) -> None: + cpp_set_compat_mode(compat_mode) def thread_pool_nthreads() -> int: return cpp_thread_pool_nthreads() -def thread_pool_nthreads_reset(nthreads: int) -> None: - cpp_thread_pool_nthreads_reset(nthreads) +def set_thread_pool_nthreads(nthreads: int) -> None: + cpp_set_thread_pool_nthreads(nthreads) def task_size() -> int: return cpp_task_size() -def task_size_reset(nbytes: int) -> None: - cpp_task_size_reset(nbytes) +def set_task_size(nbytes: int) -> None: + cpp_set_task_size(nbytes) def gds_threshold() -> int: return cpp_gds_threshold() -def gds_threshold_reset(nbytes: int) -> None: - cpp_gds_threshold_reset(nbytes) +def set_gds_threshold(nbytes: int) -> None: + cpp_set_gds_threshold(nbytes) def bounce_buffer_size() -> int: return cpp_bounce_buffer_size() -def bounce_buffer_size_reset(nbytes: int) -> None: - cpp_bounce_buffer_size_reset(nbytes) +def set_bounce_buffer_size(nbytes: int) -> None: + cpp_set_bounce_buffer_size(nbytes) def http_max_attempts() -> int: return cpp_http_max_attempts() -def http_max_attempts_reset(attempts: int) -> None: - cpp_http_max_attempts_reset(attempts) +def set_http_max_attempts(attempts: int) -> None: + cpp_set_http_max_attempts(attempts) def http_status_codes() -> list[int]: return cpp_http_status_codes() -def http_status_codes_reset(status_codes: list[int]) -> None: - return cpp_http_status_codes_reset(status_codes) +def set_http_status_codes(status_codes: list[int]) -> None: + return cpp_set_http_status_codes(status_codes) diff --git a/python/kvikio/kvikio/benchmarks/http_io.py b/python/kvikio/kvikio/benchmarks/http_io.py index 68d4643004..af4e44b973 100644 --- a/python/kvikio/kvikio/benchmarks/http_io.py +++ b/python/kvikio/kvikio/benchmarks/http_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -47,7 +47,7 @@ def main(args): cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool cupy.arange(10) # Make sure CUDA is initialized - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) print("Roundtrip benchmark") print("--------------------------------------") print(f"nelem | {args.nelem} ({format_bytes(args.nbytes)})") @@ -68,7 +68,8 @@ def main(args): res.append(elapsed) def pprint_api_res(name, samples): - samples = [args.nbytes / s for s in samples] # Convert to throughput + # Convert to throughput + samples = [args.nbytes / s for s in samples] mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] ret = f"{api}-{name}".ljust(18) ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) diff --git a/python/kvikio/kvikio/benchmarks/s3_io.py b/python/kvikio/kvikio/benchmarks/s3_io.py index 5e1846a1e5..08bdfc93a0 100644 --- a/python/kvikio/kvikio/benchmarks/s3_io.py +++ b/python/kvikio/kvikio/benchmarks/s3_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -134,7 +134,7 @@ def main(args): cupy.arange(10) # Make sure CUDA is initialized os.environ["KVIKIO_NTHREADS"] = str(args.nthreads) - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) print("Remote S3 benchmark") print("--------------------------------------") @@ -157,7 +157,8 @@ def main(args): res.append(elapsed) def pprint_api_res(name, samples): - samples = [args.nbytes / s for s in samples] # Convert to throughput + # Convert to throughput + samples = [args.nbytes / s for s in samples] mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] ret = f"{api}-{name}".ljust(18) ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) diff --git a/python/kvikio/kvikio/benchmarks/single_node_io.py b/python/kvikio/kvikio/benchmarks/single_node_io.py index bca29ef90d..f5fc9057d1 100644 --- a/python/kvikio/kvikio/benchmarks/single_node_io.py +++ b/python/kvikio/kvikio/benchmarks/single_node_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -259,7 +259,7 @@ def main(args): cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool cupy.arange(10) # Make sure CUDA is initialized - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) print("Roundtrip benchmark") print("----------------------------------") diff --git a/python/kvikio/kvikio/benchmarks/utils.py b/python/kvikio/kvikio/benchmarks/utils.py index fa25c361a4..02d97991de 100644 --- a/python/kvikio/kvikio/benchmarks/utils.py +++ b/python/kvikio/kvikio/benchmarks/utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. from __future__ import annotations @@ -28,7 +28,7 @@ def pprint_sys_info() -> None: """Pretty print system information""" version = kvikio.cufile_driver.libcufile_version() - props = kvikio.cufile_driver.DriverProperties() + props = kvikio.cufile_driver.properties try: import pynvml diff --git a/python/kvikio/kvikio/benchmarks/zarr_io.py b/python/kvikio/kvikio/benchmarks/zarr_io.py index fc226c2263..7882fcad8c 100644 --- a/python/kvikio/kvikio/benchmarks/zarr_io.py +++ b/python/kvikio/kvikio/benchmarks/zarr_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2023-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -118,7 +118,7 @@ def main(args): cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool cupy.arange(10) # Make sure CUDA is initialized - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) drop_vm_cache_msg = str(args.drop_vm_cache) if not args.drop_vm_cache: drop_vm_cache_msg += " (use --drop-vm-cache for better accuracy!)" diff --git a/python/kvikio/kvikio/cufile_driver.py b/python/kvikio/kvikio/cufile_driver.py index fb32be347a..8c8804d885 100644 --- a/python/kvikio/kvikio/cufile_driver.py +++ b/python/kvikio/kvikio/cufile_driver.py @@ -1,14 +1,217 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import atexit -from typing import Tuple +from typing import Any, Tuple, overload +import kvikio.utils from kvikio._lib import cufile_driver # type: ignore -# TODO: Wrap nicely, maybe as a dataclass? -# -DriverProperties = cufile_driver.DriverProperties +properties = cufile_driver.DriverProperties() +"""cuFile driver configurations. Use kvikio.cufile_driver.get and + kvikio.cufile_driver.set to access the configurations. +""" + + +class ConfigContextManager: + """Context manager allowing the cuFile driver configurations to be set upon + entering a `with` block, and automatically reset upon leaving the block. + """ + + def __init__(self, config: dict[str, str]): + ( + self._property_getters, + self._property_setters, + self._readonly_property_getters, + ) = self._property_getter_and_setter() + self._old_properties = {} + + for key, value in config.items(): + self._old_properties[key] = self._get_property(key) + self._set_property(key, value) + + def __enter__(self): + return None + + def __exit__(self, type_unused, value, traceback_unused): + for key, value in self._old_properties.items(): + self._set_property(key, value) + + def _get_property(self, property: str) -> Any: + if property in self._property_getters: + func = self._property_getters[property] + elif property in self._readonly_property_getters: + func = self._readonly_property_getters[property] + else: + raise KeyError + + # getter signature: object.__get__(self, instance, owner=None) + return func(properties) + + def _set_property(self, property: str, value: Any): + if property in self._readonly_property_getters: + raise KeyError("This property is read-only.") + + func = self._property_setters[property] + + # setter signature: object.__set__(self, instance, value) + func(properties, value) + + @kvikio.utils.call_once + def _property_getter_and_setter( + self, + ) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]: + class_dict = vars(cufile_driver.DriverProperties) + + property_getter_names = [ + "poll_mode", + "poll_thresh_size", + "max_device_cache_size", + "max_pinned_memory_size", + ] + + property_getters = {} + property_setters = {} + + for name in property_getter_names: + property_getters[name] = class_dict[name].__get__ + property_setters[name] = class_dict[name].__set__ + + readonly_property_getter_names = [ + "is_gds_available", + "major_version", + "minor_version", + "allow_compat_mode", + "per_buffer_cache_size", + ] + readonly_property_getters = {} + for name in readonly_property_getter_names: + readonly_property_getters[name] = class_dict[name].__get__ + + return property_getters, property_setters, readonly_property_getters + + +@overload +def set(config: dict[str, Any], /) -> ConfigContextManager: + ... + + +@overload +def set(key: str, value: Any, /) -> ConfigContextManager: + ... + + +def set(*config) -> ConfigContextManager: + """Set cuFile driver configurations. + + Examples: + + - To set one or more properties + + .. code-block:: python + + # Set the property globally. + kvikio.cufile_driver.set({"prop1": value1, "prop2": value2}) + + # Set the property with a context manager. + # The property automatically reverts to its old value + # after leaving the `with` block. + with kvikio.cufile_driver.set({"prop1": value1, "prop2": value2}): + ... + + - To set a single property + + .. code-block:: python + + # Set the property globally. + kvikio.cufile_driver.set("prop", value) + + # Set the property with a context manager. + # The property automatically reverts to its old value + # after leaving the `with` block. + with kvikio.cufile_driver.set("prop", value): + ... + + Parameters + ---------- + config + The configurations. Can either be a single parameter (dict) consisting of one + or more properties, or two parameters key (string) and value (Any) + indicating a single property. + + Valid configuration names are: + + - Read-only properties: + + - ``"is_gds_available"`` + - ``"major_version"`` + - ``"minor_version"`` + - ``"allow_compat_mode"`` + - ``"per_buffer_cache_size"`` + + - Settable properties: + + - ``"poll_mode"`` + - ``"poll_thresh_size"`` + - ``"max_device_cache_size"`` + - ``"max_pinned_memory_size"`` + + Returns + ------- + ConfigContextManager + A context manager. If used in a `with` statement, the configuration will revert + to its old value upon leaving the block. + """ + + err_msg = ( + "Valid arguments are kvikio.cufile_driver.set(config: dict) or " + "kvikio.cufile_driver.set(key: str, value: Any)" + ) + + if len(config) == 1: + if not isinstance(config[0], dict): + raise ValueError(err_msg) + return ConfigContextManager(config[0]) + elif len(config) == 2: + if not isinstance(config[0], str): + raise ValueError(err_msg) + return ConfigContextManager({config[0]: config[1]}) + else: + raise ValueError(err_msg) + + +def get(config_name: str) -> Any: + """Get cuFile driver configurations. + + Parameters + ---------- + config_name: str + The name of the configuration. + + Valid configuration names are: + + - Read-only properties: + + - ``"is_gds_available"`` + - ``"major_version"`` + - ``"minor_version"`` + - ``"allow_compat_mode"`` + - ``"per_buffer_cache_size"`` + + - Settable properties: + + - ``"poll_mode"`` + - ``"poll_thresh_size"`` + - ``"max_device_cache_size"`` + - ``"max_pinned_memory_size"`` + + Returns + ------- + Any + The value of the configuration. + """ + context_manager = ConfigContextManager({}) + return context_manager._get_property(config_name) def libcufile_version() -> Tuple[int, int]: diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index 4201cc29a3..0608a21a37 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -2,13 +2,188 @@ # See file LICENSE for terms. -import contextlib +from typing import Any, overload import kvikio._lib.defaults +from kvikio.utils import call_once, kvikio_deprecation_notice +class ConfigContextManager: + """Context manager allowing the KvikIO configurations to be set upon entering a + `with` block, and automatically reset upon leaving the block. + """ + + def __init__(self, config: dict[str, str]): + ( + self._property_getters, + self._property_setters, + ) = self._property_getter_and_setter() + self._old_properties = {} + + for key, value in config.items(): + self._old_properties[key] = self._get_property(key) + self._set_property(key, value) + + def __enter__(self): + return None + + def __exit__(self, type_unused, value, traceback_unused): + for key, value in self._old_properties.items(): + self._set_property(key, value) + + def _get_property(self, property: str) -> Any: + if property == "num_threads": + property = "thread_pool_nthreads" + func = self._property_getters[property] + return func() + + def _set_property(self, property: str, value: Any): + if property == "num_threads": + property = "thread_pool_nthreads" + func = self._property_setters[property] + func(value) + + @call_once + def _property_getter_and_setter(self) -> tuple[dict[str, Any], dict[str, Any]]: + module_dict = vars(kvikio._lib.defaults) + + property_getter_names = [ + "compat_mode", + "thread_pool_nthreads", + "task_size", + "gds_threshold", + "bounce_buffer_size", + "http_max_attempts", + "http_status_codes", + ] + + property_getters = {} + property_setters = {} + + for name in property_getter_names: + property_getters[name] = module_dict[name] + property_setters[name] = module_dict["set_" + name] + return property_getters, property_setters + + +@overload +def set(config: dict[str, Any], /) -> ConfigContextManager: + ... + + +@overload +def set(key: str, value: Any, /) -> ConfigContextManager: + ... + + +def set(*config) -> ConfigContextManager: + """Set KvikIO configurations. + + Examples: + + - To set one or more properties + + .. code-block:: python + + # Set the property globally. + kvikio.defaults.set({"prop1": value1, "prop2": value2}) + + # Set the property with a context manager. + # The property automatically reverts to its old value + # after leaving the `with` block. + with kvikio.defaults.set({"prop1": value1, "prop2": value2}): + ... + + - To set a single property + + .. code-block:: python + + # Set the property globally. + kvikio.defaults.set("prop", value) + + # Set the property with a context manager. + # The property automatically reverts to its old value + # after leaving the `with` block. + with kvikio.defaults.set("prop", value): + ... + + Parameters + ---------- + config + The configurations. Can either be a single parameter (dict) consisting of one + or more properties, or two parameters key (string) and value (Any) + indicating a single property. + + Valid configuration names are: + + - ``"compat_mode"`` + - ``"num_threads"`` + - ``"task_size"`` + - ``"gds_threshold"`` + - ``"bounce_buffer_size"`` + - ``"http_max_attempts"`` + - ``"http_status_codes"`` + + Returns + ------- + ConfigContextManager + A context manager. If used in a `with` statement, the configuration will revert + to its old value upon leaving the block. + """ + + err_msg = ( + "Valid arguments are kvikio.defaults.set(config: dict) or " + "kvikio.defaults.set(key: str, value: Any)" + ) + + if len(config) == 1: + if not isinstance(config[0], dict): + raise ValueError(err_msg) + return ConfigContextManager(config[0]) + elif len(config) == 2: + if not isinstance(config[0], str): + raise ValueError(err_msg) + return ConfigContextManager({config[0]: config[1]}) + else: + raise ValueError(err_msg) + + +def get(config_name: str) -> Any: + """Get KvikIO configurations. + + Parameters + ---------- + config_name: str + The name of the configuration. + + Valid configuration names are: + + - ``"compat_mode"`` + - ``"num_threads"`` + - ``"task_size"`` + - ``"gds_threshold"`` + - ``"bounce_buffer_size"`` + - ``"http_max_attempts"`` + - ``"http_status_codes"`` + + Returns + ------- + Any + The value of the configuration. + """ + context_manager = ConfigContextManager({}) + return context_manager._get_property(config_name) + + +def is_compat_mode_preferred() -> bool: + return kvikio._lib.defaults.is_compat_mode_preferred() + + +@kvikio_deprecation_notice('Use kvikio.defaults.get("compat_mode") instead') def compat_mode() -> kvikio.CompatMode: - """Check if KvikIO is running in compatibility mode. + """Deprecated. Use :meth:`kvikio.defaults.get` instead. + + Check if KvikIO is running in compatibility mode. Notice, this is not the same as the compatibility mode in cuFile. That is, cuFile can run in compatibility mode while KvikIO is not. @@ -32,45 +207,13 @@ def compat_mode() -> kvikio.CompatMode: return kvikio._lib.defaults.compat_mode() -def compat_mode_reset(compatmode: kvikio.CompatMode) -> None: - """Reset the compatibility mode. +@kvikio_deprecation_notice('Use kvikio.defaults.get("num_threads") instead') +def num_threads() -> int: + """Deprecated. Use :meth:`kvikio.defaults.get` instead. - Use this function to enable/disable compatibility mode explicitly. + Get the number of threads of the thread pool. - Parameters - ---------- - compatmode : kvikio.CompatMode - Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable - compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try - OFF first, and upon failure, fall back to ON. - """ - kvikio._lib.defaults.compat_mode_reset(compatmode) - - -@contextlib.contextmanager -def set_compat_mode(compatmode: kvikio.CompatMode): - """Context for resetting the compatibility mode. - - Parameters - ---------- - compatmode : kvikio.CompatMode - Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable - compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try - OFF first, and upon failure, fall back to ON. - """ - num_threads_reset(get_num_threads()) # Sync all running threads - old_value = compat_mode() - try: - compat_mode_reset(compatmode) - yield - finally: - compat_mode_reset(old_value) - - -def get_num_threads() -> int: - """Get the number of threads of the thread pool. - - Set the default value using `num_threads_reset()` or by setting the + Set the default value using `set("num_threads", value)` or by setting the `KVIKIO_NTHREADS` environment variable. If not set, the default value is 1. Returns @@ -81,46 +224,13 @@ def get_num_threads() -> int: return kvikio._lib.defaults.thread_pool_nthreads() -def num_threads_reset(nthreads: int) -> None: - """Reset the number of threads in the default thread pool. - - Waits for all currently running tasks to be completed, then destroys all threads - in the pool and creates a new thread pool with the new number of threads. Any - tasks that were waiting in the queue before the pool was reset will then be - executed by the new threads. If the pool was paused before resetting it, the new - pool will be paused as well. - - Parameters - ---------- - nthreads : int - The number of threads to use. The default value can be specified by setting - the `KVIKIO_NTHREADS` environment variable. If not set, the default value - is 1. - """ - kvikio._lib.defaults.thread_pool_nthreads_reset(nthreads) - - -@contextlib.contextmanager -def set_num_threads(nthreads: int): - """Context for resetting the number of threads in the default thread pool. - - Parameters - ---------- - nthreads : int - The number of threads to use. - """ - old_value = get_num_threads() - try: - num_threads_reset(nthreads) - yield - finally: - num_threads_reset(old_value) - - +@kvikio_deprecation_notice('Use kvikio.defaults.get("task_size") instead') def task_size() -> int: - """Get the default task size used for parallel IO operations. + """Deprecated. Use :meth:`kvikio.defaults.get` instead. - Set the default value using `task_size_reset()` or by setting + Get the default task size used for parallel IO operations. + + Set the default value using `set("task_size", value)` or by setting the `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 4 MiB. @@ -132,42 +242,17 @@ def task_size() -> int: return kvikio._lib.defaults.task_size() -def task_size_reset(nbytes: int) -> None: - """Reset the default task size used for parallel IO operations. - - Parameters - ---------- - nbytes : int - The default task size in bytes. - """ - kvikio._lib.defaults.task_size_reset(nbytes) - - -@contextlib.contextmanager -def set_task_size(nbytes: int): - """Context for resetting the task size used for parallel IO operations. - - Parameters - ---------- - nbytes : int - The default task size in bytes. - """ - old_value = task_size() - try: - task_size_reset(nbytes) - yield - finally: - task_size_reset(old_value) - - +@kvikio_deprecation_notice('Use kvikio.defaults.get("gds_threshold") instead') def gds_threshold() -> int: - """Get the default GDS threshold, which is the minimum size to use GDS. + """Deprecated. Use :meth:`kvikio.defaults.get` instead. + + Get the default GDS threshold, which is the minimum size to use GDS. In order to improve performance of small IO, `.pread()` and `.pwrite()` implements a shortcut that circumvent the threadpool and use the POSIX backend directly. - Set the default value using `gds_threshold_reset()` or by setting the + Set the default value using `set("gds_threshold", value)` or by setting the `KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default value is 1 MiB. @@ -179,168 +264,245 @@ def gds_threshold() -> int: return kvikio._lib.defaults.gds_threshold() -def gds_threshold_reset(nbytes: int) -> None: - """Reset the default GDS threshold, which is the minimum size to use GDS. +@kvikio_deprecation_notice('Use kvikio.defaults.get("bounce_buffer_size") instead') +def bounce_buffer_size() -> int: + """Deprecated. Use :meth:`kvikio.defaults.get` instead. - Parameters - ---------- + Get the size of the bounce buffer used to stage data in host memory. + + Set the value using `set("bounce_buffer_size", value)` or by setting the + `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the + value is 16 MiB. + + Returns + ------- nbytes : int - The default GDS threshold size in bytes. + The bounce buffer size in bytes. """ - kvikio._lib.defaults.gds_threshold_reset(nbytes) + return kvikio._lib.defaults.bounce_buffer_size() -@contextlib.contextmanager -def set_gds_threshold(nbytes: int): - """Context for resetting the default GDS threshold. +@kvikio_deprecation_notice('Use kvikio.defaults.get("http_max_attempts") instead') +def http_max_attempts() -> int: + """Deprecated. Use :meth:`kvikio.defaults.get` instead. - Parameters - ---------- - nbytes : int - The default GDS threshold size in bytes. + Get the maximum number of attempts per remote IO read. + + Reads are retried up until ``http_max_attempts`` when the response has certain + HTTP status codes. + + Set the value using `set("http_max_attempts", value)` or by setting the + ``KVIKIO_HTTP_MAX_ATTEMPTS`` environment variable. If not set, the + value is 3. + + Returns + ------- + max_attempts : int + The maximum number of remote IO reads to attempt before raising an + error. """ - old_value = gds_threshold() - try: - gds_threshold_reset(nbytes) - yield - finally: - gds_threshold_reset(old_value) + return kvikio._lib.defaults.http_max_attempts() -def bounce_buffer_size() -> int: - """Get the size of the bounce buffer used to stage data in host memory. +@kvikio_deprecation_notice('Use kvikio.defaults.get("http_status_codes") instead') +def http_status_codes() -> list[int]: + """Deprecated. Use :meth:`kvikio.defaults.get` instead. - Set the value using `bounce_buffer_size_reset()` or by setting the - `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the - value is 16 MiB. + Get the list of HTTP status codes to retry. + + Set the value using ``set("http_status_codes", value)`` or by setting the + ``KVIKIO_HTTP_STATUS_CODES`` environment variable. If not set, the + default value is + + - 429 + - 500 + - 502 + - 503 + - 504 Returns ------- - nbytes : int - The bounce buffer size in bytes. + status_codes : list[int] + The HTTP status codes to retry. """ - return kvikio._lib.defaults.bounce_buffer_size() + return kvikio._lib.defaults.http_status_codes() -def bounce_buffer_size_reset(nbytes: int) -> None: - """Reset the size of the bounce buffer used to stage data in host memory. +@kvikio_deprecation_notice('Use kvikio.defaults.set("compat_mode", value) instead') +def compat_mode_reset(compatmode: kvikio.CompatMode) -> None: + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the compatibility mode. + + Use this function to enable/disable compatibility mode explicitly. Parameters ---------- - nbytes : int - The bounce buffer size in bytes. + compatmode : kvikio.CompatMode + Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable + compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try + OFF first, and upon failure, fall back to ON. """ - kvikio._lib.defaults.bounce_buffer_size_reset(nbytes) + set("compat_mode", compatmode) -@contextlib.contextmanager -def set_bounce_buffer_size(nbytes: int): - """Context for resetting the size of the bounce buffer. +@kvikio_deprecation_notice('Use kvikio.defaults.set("compat_mode", value) instead') +def set_compat_mode(compatmode: kvikio.CompatMode): + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Same with compat_mode_reset.""" + compat_mode_reset(compatmode) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("num_threads", value) instead') +def num_threads_reset(nthreads: int) -> None: + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the number of threads in the default thread pool. + + Waits for all currently running tasks to be completed, then destroys all threads + in the pool and creates a new thread pool with the new number of threads. Any + tasks that were waiting in the queue before the pool was reset will then be + executed by the new threads. If the pool was paused before resetting it, the new + pool will be paused as well. Parameters ---------- - nbytes : int - The bounce buffer size in bytes. + nthreads : int + The number of threads to use. The default value can be specified by setting + the `KVIKIO_NTHREADS` environment variable. If not set, the default value + is 1. """ - old_value = bounce_buffer_size() - try: - bounce_buffer_size_reset(nbytes) - yield - finally: - bounce_buffer_size_reset(old_value) + set("num_threads", nthreads) -def http_max_attempts() -> int: - """Get the maximum number of attempts per remote IO read. +@kvikio_deprecation_notice('Use kvikio.defaults.set("num_threads", value) instead') +def set_num_threads(nthreads: int): + """Deprecated. Use :meth:`kvikio.defaults.set` instead. - Reads are retried up until ``http_max_attempts`` when the response has certain - HTTP status codes. + Same with num_threads_reset.""" + set("num_threads", nthreads) - Set the value using `http_max_attempts_reset()` or by setting the - ``KVIKIO_HTTP_MAX_ATTEMPTS`` environment variable. If not set, the - value is 3. - Returns - ------- - max_attempts : int - The maximum number of remote IO reads to attempt before raising an - error. +@kvikio_deprecation_notice('Use kvikio.defaults.set("task_size", value) instead') +def task_size_reset(nbytes: int) -> None: + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the default task size used for parallel IO operations. + + Parameters + ---------- + nbytes : int + The default task size in bytes. """ - return kvikio._lib.defaults.http_max_attempts() + set("task_size", nbytes) -def http_max_attempts_reset(attempts: int) -> None: - """Reset the maximum number of attempts per remote IO read. +@kvikio_deprecation_notice('Use kvikio.defaults.set("task_size", value) instead') +def set_task_size(nbytes: int): + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Same with task_size_reset.""" + set("task_size", nbytes) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("gds_threshold", value) instead') +def gds_threshold_reset(nbytes: int) -> None: + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the default GDS threshold, which is the minimum size to + use GDS. Parameters ---------- - attempts : int - The maximum number of attempts to try before raising an error. + nbytes : int + The default GDS threshold size in bytes. """ - kvikio._lib.defaults.http_max_attempts_reset(attempts) + set("gds_threshold", nbytes) -@contextlib.contextmanager -def set_http_max_attempts(attempts: int): - """Context for resetting the maximum number of HTTP attempts. +@kvikio_deprecation_notice('Use kvikio.defaults.set("gds_threshold", value) instead') +def set_gds_threshold(nbytes: int): + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Same with gds_threshold_reset.""" + set("gds_threshold", nbytes) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("bounce_buffer_size", value) instead' +) +def bounce_buffer_size_reset(nbytes: int) -> None: + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the size of the bounce buffer used to stage data in host + memory. Parameters ---------- - attempts : int - The maximum number of attempts to try before raising an error. + nbytes : int + The bounce buffer size in bytes. """ - old_value = http_max_attempts() - try: - http_max_attempts_reset(attempts) - yield - finally: - http_max_attempts_reset(old_value) + set("bounce_buffer_size", nbytes) -def http_status_codes() -> list[int]: - """Get the list of HTTP status codes to retry. +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("bounce_buffer_size", value) instead' +) +def set_bounce_buffer_size(nbytes: int): + """Deprecated. Use :meth:`kvikio.defaults.set` instead. - Set the value using ``set_http_status_codes`` or by setting the - ``KVIKIO_HTTP_STATUS_CODES`` environment variable. If not set, the - default value is + Same with bounce_buffer_size_reset.""" + set("bounce_buffer_size", nbytes) - - 429 - - 500 - - 502 - - 503 - - 504 - Returns - ------- - status_codes : list[int] - The HTTP status codes to retry. +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_max_attempts", value) instead' +) +def http_max_attempts_reset(attempts: int) -> None: + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the maximum number of attempts per remote IO read. + + Parameters + ---------- + attempts : int + The maximum number of attempts to try before raising an error. """ - return kvikio._lib.defaults.http_status_codes() + set("http_max_attempts", attempts) +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_max_attempts", value) instead' +) +def set_http_max_attempts(attempts: int): + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Same with http_max_attempts_reset.""" + set("http_max_attempts", attempts) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_status_codes", value) instead' +) def http_status_codes_reset(status_codes: list[int]) -> None: - """Reset the list of HTTP status codes to retry. + """Deprecated. Use :meth:`kvikio.defaults.set` instead. + + Reset the list of HTTP status codes to retry. Parameters ---------- status_codes : list[int] The HTTP status codes to retry. """ - kvikio._lib.defaults.http_status_codes_reset(status_codes) + set("http_status_codes", status_codes) -@contextlib.contextmanager +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_status_codes", value) instead' +) def set_http_status_codes(status_codes: list[int]): - """Context for resetting the HTTP status codes to retry. + """Deprecated. Use :meth:`kvikio.defaults.set` instead. - Parameters - ---------- - status_codes : list[int] - THe HTTP status codes to retry. - """ - old_value = http_status_codes() - try: - http_status_codes_reset(status_codes) - yield - finally: - http_status_codes_reset(old_value) + Same with http_status_codes_reset.""" + set("http_status_codes", status_codes) diff --git a/python/kvikio/kvikio/utils.py b/python/kvikio/kvikio/utils.py index fc88e321a5..575b197da2 100644 --- a/python/kvikio/kvikio/utils.py +++ b/python/kvikio/kvikio/utils.py @@ -6,12 +6,13 @@ import pathlib import threading import time +import warnings from http.server import ( BaseHTTPRequestHandler, SimpleHTTPRequestHandler, ThreadingHTTPServer, ) -from typing import Any +from typing import Any, Callable class LocalHttpServer: @@ -94,3 +95,70 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.process.kill() + + +def call_once(func: Callable) -> Callable: + """Decorate a function such that it is only called once + + Examples: + + .. code-block:: python + + @kvikio.utils.call_once + foo(args) + + Parameters + ---------- + func: Callable + The function to be decorated. + + Returns + ------- + Callable + A decorated function. + """ + once_flag = True + cached_result = None + + def wrapper(*args, **kwargs): + nonlocal once_flag + nonlocal cached_result + if once_flag: + once_flag = False + cached_result = func(*args, **kwargs) + return cached_result + + return wrapper + + +def kvikio_deprecation_notice(msg: str) -> Callable: + """Decorate a function to print the deprecation notice at runtime. + + Examples: + + .. code-block:: python + + @kvikio.utils.kvikio_deprecation_notice("Use bar(args) instead.") + foo(args) + + Parameters + ---------- + msg: str + The deprecation notice. + + Returns + ------- + Callable + A decorated function. + """ + + def decorator(func: Callable): + def wrapper(*args, **kwargs): + warnings.warn(msg, category=FutureWarning, stacklevel=2) + return func(*args, **kwargs) + + # Allow the docstring to be corrected generated for the decorated func in Sphinx + wrapper.__doc__ = func.__doc__ + return wrapper + + return decorator diff --git a/python/kvikio/tests/conftest.py b/python/kvikio/tests/conftest.py index c1cc77026e..120f862ae6 100644 --- a/python/kvikio/tests/conftest.py +++ b/python/kvikio/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import contextlib @@ -94,5 +94,5 @@ def xp(request): def gds_threshold(request): """Fixture to parametrize over GDS threshold values""" - with kvikio.defaults.set_gds_threshold(request.param): + with kvikio.defaults.set("gds_threshold", request.param): yield request.param diff --git a/python/kvikio/tests/test_basic_io.py b/python/kvikio/tests/test_basic_io.py index e1e9932e23..744941861a 100644 --- a/python/kvikio/tests/test_basic_io.py +++ b/python/kvikio/tests/test_basic_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import os @@ -27,18 +27,17 @@ def test_write(tmp_path, xp, gds_threshold, size, nthreads, tasksize): """Test basic read/write""" filename = tmp_path / "test-file" - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - a = xp.arange(size) - f = kvikio.CuFile(filename, "w") - assert not f.closed - assert check_bit_flags(f.open_flags(), os.O_WRONLY) - assert f.write(a) == a.nbytes - f.close() - assert f.closed + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + a = xp.arange(size) + f = kvikio.CuFile(filename, "w") + assert not f.closed + assert check_bit_flags(f.open_flags(), os.O_WRONLY) + assert f.write(a) == a.nbytes + f.close() + assert f.closed - b = numpy.fromfile(filename, dtype=a.dtype) - xp.testing.assert_array_equal(a, b) + b = numpy.fromfile(filename, dtype=a.dtype) + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10]) @@ -48,17 +47,16 @@ def test_read(tmp_path, xp, gds_threshold, size, nthreads, tasksize): """Test basic read/write""" filename = tmp_path / "test-file" - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - a = numpy.arange(size) - a.tofile(filename) - os.sync() + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + a = numpy.arange(size) + a.tofile(filename) + os.sync() - b = xp.empty_like(a) - f = kvikio.CuFile(filename, "r") - assert check_bit_flags(f.open_flags(), os.O_RDONLY) - assert f.read(b) == b.nbytes - xp.testing.assert_array_equal(a, b) + b = xp.empty_like(a) + f = kvikio.CuFile(filename, "r") + assert check_bit_flags(f.open_flags(), os.O_RDONLY) + assert f.read(b) == b.nbytes + xp.testing.assert_array_equal(a, b) def test_file_handle_context(tmp_path): @@ -100,17 +98,16 @@ def test_incorrect_open_mode_error(tmp_path, xp): @pytest.mark.skipif( - kvikio.defaults.compat_mode(), + kvikio.defaults.is_compat_mode_preferred(), reason="cannot test `set_compat_mode` when already running in compatibility mode", ) def test_set_compat_mode_between_io(tmp_path): """Test changing `compat_mode`""" - - with kvikio.defaults.set_compat_mode(False): + with kvikio.defaults.set("compat_mode", kvikio.CompatMode.OFF): f = kvikio.CuFile(tmp_path / "test-file", "w") assert not f.closed assert f.open_flags() & os.O_WRONLY != 0 - with kvikio.defaults.set_compat_mode(True): + with kvikio.defaults.set("compat_mode", kvikio.CompatMode.ON): a = cupy.arange(10) assert f.write(a) == a.nbytes @@ -155,17 +152,16 @@ def test_write_to_files_in_chunks(tmp_path, xp, gds_threshold): def test_read_write_slices(tmp_path, xp, gds_threshold, nthreads, tasksize, start, end): """Read and write different slices""" - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - filename = tmp_path / "test-file" - a = xp.arange(10 * 4096) # 10 page-sizes - b = a.copy() - a[start:end] = 42 - with kvikio.CuFile(filename, "w") as f: - assert f.write(a[start:end]) == a[start:end].nbytes - with kvikio.CuFile(filename, "r") as f: - assert f.read(b[start:end]) == b[start:end].nbytes - xp.testing.assert_array_equal(a, b) + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + filename = tmp_path / "test-file" + a = xp.arange(10 * 4096) # 10 page-sizes + b = a.copy() + a[start:end] = 42 + with kvikio.CuFile(filename, "w") as f: + assert f.write(a[start:end]) == a[start:end].nbytes + with kvikio.CuFile(filename, "r") as f: + assert f.read(b[start:end]) == b[start:end].nbytes + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10]) @@ -232,31 +228,30 @@ def test_multiple_gpus(tmp_path, xp, gds_threshold): """Test IO from two different GPUs""" filename = tmp_path / "test-file" - with kvikio.defaults.set_num_threads(10): - with kvikio.defaults.set_task_size(10): - # Allocate an array on each device + with kvikio.defaults.set({"num_threads": 10, "task_size": 10}): + # Allocate an array on each device + with cupy.cuda.Device(0): + a0 = xp.arange(200) + with cupy.cuda.Device(1): + a1 = xp.zeros(200, dtype=a0.dtype) + + # Test when the device match the allocation + with kvikio.CuFile(filename, "w") as f: with cupy.cuda.Device(0): - a0 = xp.arange(200) + assert f.write(a0) == a0.nbytes + with kvikio.CuFile(filename, "r") as f: with cupy.cuda.Device(1): - a1 = xp.zeros(200, dtype=a0.dtype) - - # Test when the device match the allocation - with kvikio.CuFile(filename, "w") as f: - with cupy.cuda.Device(0): - assert f.write(a0) == a0.nbytes - with kvikio.CuFile(filename, "r") as f: - with cupy.cuda.Device(1): - assert f.read(a1) == a1.nbytes - assert bytes(a0) == bytes(a1) - - # Test when the device doesn't match the allocation - with kvikio.CuFile(filename, "w") as f: - with cupy.cuda.Device(1): - assert f.write(a0) == a0.nbytes - with kvikio.CuFile(filename, "r") as f: - with cupy.cuda.Device(0): - assert f.read(a1) == a1.nbytes - assert bytes(a0) == bytes(a1) + assert f.read(a1) == a1.nbytes + assert bytes(a0) == bytes(a1) + + # Test when the device doesn't match the allocation + with kvikio.CuFile(filename, "w") as f: + with cupy.cuda.Device(1): + assert f.write(a0) == a0.nbytes + with kvikio.CuFile(filename, "r") as f: + with cupy.cuda.Device(0): + assert f.read(a1) == a1.nbytes + assert bytes(a0) == bytes(a1) @pytest.mark.parametrize("size", [1, 10, 100, 1000]) @@ -265,31 +260,35 @@ def test_multiple_gpus(tmp_path, xp, gds_threshold): def test_different_bounce_buffer_sizes(tmp_path, size, tasksize, buffer_size): """Test different bounce buffer sizes""" filename = tmp_path / "test-file" - with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(10): - with kvikio.defaults.set_task_size(tasksize): - with kvikio.defaults.set_bounce_buffer_size(buffer_size): - with kvikio.CuFile(filename, "w+") as f: - a = cupy.arange(size) - b = cupy.empty_like(a) - f.write(a) - assert f.read(b) == b.nbytes - cupy.testing.assert_array_equal(a, b) + with kvikio.defaults.set( + { + "compat_mode": kvikio.CompatMode.ON, + "num_threads": 10, + "bounce_buffer_size": buffer_size, + } + ): + with kvikio.CuFile(filename, "w+") as f: + a = cupy.arange(size) + b = cupy.empty_like(a) + f.write(a) + assert f.read(b) == b.nbytes + cupy.testing.assert_array_equal(a, b) def test_bounce_buffer_free(tmp_path): """Test freeing the bounce buffer allocations""" filename = tmp_path / "test-file" kvikio.buffer.bounce_buffer_free() - with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(1): + with kvikio.defaults.set({"compat_mode": kvikio.CompatMode.ON, "num_threads": 1}): with kvikio.CuFile(filename, "w") as f: - with kvikio.defaults.set_bounce_buffer_size(1024): + with kvikio.defaults.set({"bounce_buffer_size": 1024}): # Notice, since the bounce buffer size is only checked when the buffer # is used, we populate the bounce buffer in between we clear it. f.write(cupy.arange(10)) assert kvikio.buffer.bounce_buffer_free() == 1024 assert kvikio.buffer.bounce_buffer_free() == 0 f.write(cupy.arange(10)) - with kvikio.defaults.set_bounce_buffer_size(2048): + with kvikio.defaults.set({"bounce_buffer_size": 2048}): f.write(cupy.arange(10)) assert kvikio.buffer.bounce_buffer_free() == 2048 assert kvikio.buffer.bounce_buffer_free() == 0 diff --git a/python/kvikio/tests/test_cufile_driver.py b/python/kvikio/tests/test_cufile_driver.py index a1dc3a6454..7e31d24335 100644 --- a/python/kvikio/tests/test_cufile_driver.py +++ b/python/kvikio/tests/test_cufile_driver.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import pytest @@ -16,3 +16,43 @@ def test_version(): def test_open_and_close(): kvikio.cufile_driver.driver_open() kvikio.cufile_driver.driver_close() + + +def test_property_accessor(): + """Test the method `get` and `set`""" + + # Attempt to set a nonexistent property + with pytest.raises(KeyError): + kvikio.cufile_driver.set("nonexistent_property", 123) + + # Attempt to get a nonexistent property + with pytest.raises(KeyError): + kvikio.cufile_driver.get("nonexistent_property") + + # Attempt to set a read-only property + with pytest.raises(KeyError, match="read-only"): + kvikio.cufile_driver.set("major_version", 2077) + + # Nested context managers + poll_thresh_size_default = kvikio.cufile_driver.get("poll_thresh_size") + with kvikio.cufile_driver.set("poll_thresh_size", 1024): + assert kvikio.cufile_driver.get("poll_thresh_size") == 1024 + with kvikio.cufile_driver.set("poll_thresh_size", 2048): + assert kvikio.cufile_driver.get("poll_thresh_size") == 2048 + with kvikio.cufile_driver.set("poll_thresh_size", 4096): + assert kvikio.cufile_driver.get("poll_thresh_size") == 4096 + assert kvikio.cufile_driver.get("poll_thresh_size") == 2048 + assert kvikio.cufile_driver.get("poll_thresh_size") == 1024 + assert kvikio.cufile_driver.get("poll_thresh_size") == poll_thresh_size_default + + # Multiple context managers + poll_mode_default = kvikio.cufile_driver.get("poll_mode") + max_device_cache_size_default = kvikio.cufile_driver.get("max_device_cache_size") + with kvikio.cufile_driver.set({"poll_mode": True, "max_device_cache_size": 2048}): + assert kvikio.cufile_driver.get("poll_mode") and ( + kvikio.cufile_driver.get("max_device_cache_size") == 2048 + ) + assert (kvikio.cufile_driver.get("poll_mode") == poll_mode_default) and ( + kvikio.cufile_driver.get("max_device_cache_size") + == max_device_cache_size_default + ) diff --git a/python/kvikio/tests/test_defaults.py b/python/kvikio/tests/test_defaults.py index 5e2d6675f8..bc6cff6180 100644 --- a/python/kvikio/tests/test_defaults.py +++ b/python/kvikio/tests/test_defaults.py @@ -7,111 +7,158 @@ import kvikio.defaults +def test_property_accessor(): + """Test the method `get` and `set`""" + + # Attempt to set a nonexistent property + with pytest.raises(KeyError): + kvikio.defaults.set("nonexistent_property", 123) + + # Attempt to get a nonexistent property + with pytest.raises(KeyError): + kvikio.defaults.get("nonexistent_property") + + # Attempt to set a property whose name is mistakenly prefixed by "set_" + # (coinciding with the setter method). + with pytest.raises(KeyError): + kvikio.defaults.set("set_task_size", 123) + + # Nested context managers + task_size_default = kvikio.defaults.get("task_size") + with kvikio.defaults.set("task_size", 1024): + assert kvikio.defaults.get("task_size") == 1024 + with kvikio.defaults.set("task_size", 2048): + assert kvikio.defaults.get("task_size") == 2048 + with kvikio.defaults.set("task_size", 4096): + assert kvikio.defaults.get("task_size") == 4096 + assert kvikio.defaults.get("task_size") == 2048 + assert kvikio.defaults.get("task_size") == 1024 + assert kvikio.defaults.get("task_size") == task_size_default + + # Multiple context managers + task_size_default = kvikio.defaults.get("task_size") + num_threads_default = kvikio.defaults.get("num_threads") + bounce_buffer_size_default = kvikio.defaults.get("bounce_buffer_size") + with kvikio.defaults.set( + {"task_size": 1024, "num_threads": 16, "bounce_buffer_size": 1024} + ): + assert ( + (kvikio.defaults.get("task_size") == 1024) + and (kvikio.defaults.get("num_threads") == 16) + and (kvikio.defaults.get("bounce_buffer_size") == 1024) + ) + assert ( + (kvikio.defaults.get("task_size") == task_size_default) + and (kvikio.defaults.get("num_threads") == num_threads_default) + and (kvikio.defaults.get("bounce_buffer_size") == bounce_buffer_size_default) + ) + + @pytest.mark.skipif( - kvikio.defaults.compat_mode() == kvikio.CompatMode.ON, + kvikio.defaults.get("compat_mode") == kvikio.CompatMode.ON, reason="cannot test `compat_mode` when already running in compatibility mode", ) def test_compat_mode(): """Test changing `compat_mode`""" - before = kvikio.defaults.compat_mode() - with kvikio.defaults.set_compat_mode(kvikio.CompatMode.ON): - assert kvikio.defaults.compat_mode() == kvikio.CompatMode.ON - kvikio.defaults.compat_mode_reset(kvikio.CompatMode.OFF) - assert kvikio.defaults.compat_mode() == kvikio.CompatMode.OFF - kvikio.defaults.compat_mode_reset(kvikio.CompatMode.AUTO) - assert kvikio.defaults.compat_mode() == kvikio.CompatMode.AUTO - assert before == kvikio.defaults.compat_mode() + before = kvikio.defaults.get("compat_mode") + with kvikio.defaults.set("compat_mode", kvikio.CompatMode.ON): + assert kvikio.defaults.get("compat_mode") == kvikio.CompatMode.ON + kvikio.defaults.set("compat_mode", kvikio.CompatMode.OFF) + assert kvikio.defaults.get("compat_mode") == kvikio.CompatMode.OFF + kvikio.defaults.set("compat_mode", kvikio.CompatMode.AUTO) + assert kvikio.defaults.get("compat_mode") == kvikio.CompatMode.AUTO + assert before == kvikio.defaults.get("compat_mode") def test_num_threads(): """Test changing `num_threads`""" - before = kvikio.defaults.get_num_threads() - with kvikio.defaults.set_num_threads(3): - assert kvikio.defaults.get_num_threads() == 3 - kvikio.defaults.num_threads_reset(4) - assert kvikio.defaults.get_num_threads() == 4 - assert before == kvikio.defaults.get_num_threads() + before = kvikio.defaults.get("num_threads") + with kvikio.defaults.set("num_threads", 3): + assert kvikio.defaults.get("num_threads") == 3 + kvikio.defaults.set("num_threads", 4) + assert kvikio.defaults.get("num_threads") == 4 + assert before == kvikio.defaults.get("num_threads") with pytest.raises(ValueError, match="positive integer greater than zero"): - kvikio.defaults.num_threads_reset(0) + kvikio.defaults.set("num_threads", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.num_threads_reset(-1) + kvikio.defaults.set("num_threads", -1) def test_task_size(): """Test changing `task_size`""" - before = kvikio.defaults.task_size() - with kvikio.defaults.set_task_size(3): - assert kvikio.defaults.task_size() == 3 - kvikio.defaults.task_size_reset(4) - assert kvikio.defaults.task_size() == 4 - assert before == kvikio.defaults.task_size() + before = kvikio.defaults.get("task_size") + with kvikio.defaults.set("task_size", 3): + assert kvikio.defaults.get("task_size") == 3 + kvikio.defaults.set("task_size", 4) + assert kvikio.defaults.get("task_size") == 4 + assert before == kvikio.defaults.get("task_size") with pytest.raises(ValueError, match="positive integer greater than zero"): - kvikio.defaults.task_size_reset(0) + kvikio.defaults.set("task_size", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.task_size_reset(-1) + kvikio.defaults.set("task_size", -1) def test_gds_threshold(): """Test changing `gds_threshold`""" - before = kvikio.defaults.gds_threshold() - with kvikio.defaults.set_gds_threshold(3): - assert kvikio.defaults.gds_threshold() == 3 - kvikio.defaults.gds_threshold_reset(4) - assert kvikio.defaults.gds_threshold() == 4 - assert before == kvikio.defaults.gds_threshold() + before = kvikio.defaults.get("gds_threshold") + with kvikio.defaults.set("gds_threshold", 3): + assert kvikio.defaults.get("gds_threshold") == 3 + kvikio.defaults.set("gds_threshold", 4) + assert kvikio.defaults.get("gds_threshold") == 4 + assert before == kvikio.defaults.get("gds_threshold") with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.gds_threshold_reset(-1) + kvikio.defaults.set("gds_threshold", -1) def test_bounce_buffer_size(): """Test changing `bounce_buffer_size`""" - before = kvikio.defaults.bounce_buffer_size() - with kvikio.defaults.set_bounce_buffer_size(3): - assert kvikio.defaults.bounce_buffer_size() == 3 - kvikio.defaults.bounce_buffer_size_reset(4) - assert kvikio.defaults.bounce_buffer_size() == 4 - assert before == kvikio.defaults.bounce_buffer_size() + before = kvikio.defaults.get("bounce_buffer_size") + with kvikio.defaults.set("bounce_buffer_size", 3): + assert kvikio.defaults.get("bounce_buffer_size") == 3 + kvikio.defaults.set("bounce_buffer_size", 4) + assert kvikio.defaults.get("bounce_buffer_size") == 4 + assert before == kvikio.defaults.get("bounce_buffer_size") with pytest.raises(ValueError, match="positive integer greater than zero"): - kvikio.defaults.bounce_buffer_size_reset(0) + kvikio.defaults.set("bounce_buffer_size", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.bounce_buffer_size_reset(-1) + kvikio.defaults.set("bounce_buffer_size", -1) def test_http_max_attempts(): - before = kvikio.defaults.http_max_attempts() + before = kvikio.defaults.get("http_max_attempts") - with kvikio.defaults.set_http_max_attempts(5): - assert kvikio.defaults.http_max_attempts() == 5 - kvikio.defaults.http_max_attempts_reset(4) - assert kvikio.defaults.http_max_attempts() == 4 - assert kvikio.defaults.http_max_attempts() == before + with kvikio.defaults.set("http_max_attempts", 5): + assert kvikio.defaults.get("http_max_attempts") == 5 + kvikio.defaults.set("http_max_attempts", 4) + assert kvikio.defaults.get("http_max_attempts") == 4 + assert kvikio.defaults.get("http_max_attempts") == before with pytest.raises(ValueError, match="positive integer"): - kvikio.defaults.http_max_attempts_reset(0) + kvikio.defaults.set("http_max_attempts", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.http_max_attempts_reset(-1) + kvikio.defaults.set("http_max_attempts", -1) def test_http_status_codes(): - before = kvikio.defaults.http_status_codes() + before = kvikio.defaults.get("http_status_codes") - with kvikio.defaults.set_http_status_codes([500]): - assert kvikio.defaults.http_status_codes() == [500] - kvikio.defaults.http_status_codes_reset([429, 500]) - assert kvikio.defaults.http_status_codes() == [429, 500] - assert kvikio.defaults.http_status_codes() == before + with kvikio.defaults.set("http_status_codes", [500]): + assert kvikio.defaults.get("http_status_codes") == [500] + kvikio.defaults.set("http_status_codes", [429, 500]) + assert kvikio.defaults.get("http_status_codes") == [429, 500] + assert kvikio.defaults.get("http_status_codes") == before with pytest.raises(TypeError): - kvikio.defaults.http_status_codes_reset(0) + kvikio.defaults.set("http_status_codes", 0) with pytest.raises(TypeError): - kvikio.defaults.http_status_codes_reset(["a"]) + kvikio.defaults.set("http_status_codes", ["a"]) diff --git a/python/kvikio/tests/test_http_io.py b/python/kvikio/tests/test_http_io.py index e62dbb81af..e695d1f02f 100644 --- a/python/kvikio/tests/test_http_io.py +++ b/python/kvikio/tests/test_http_io.py @@ -99,14 +99,13 @@ def test_read(http_server, tmpdir, xp, size, nthreads, tasksize): a = xp.arange(size) a.tofile(tmpdir / "a") - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: - assert f.nbytes() == a.nbytes - assert f"{http_server}/a" in str(f) - b = xp.empty_like(a) - assert f.read(b) == a.nbytes - xp.testing.assert_array_equal(a, b) + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: + assert f.nbytes() == a.nbytes + assert f"{http_server}/a" in str(f) + b = xp.empty_like(a) + assert f.read(b) == a.nbytes + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize("nthreads", [1, 10]) @@ -114,7 +113,7 @@ def test_large_read(http_server, tmpdir, xp, nthreads): a = xp.arange(16_000_000) a.tofile(tmpdir / "a") - with kvikio.defaults.set_num_threads(nthreads): + with kvikio.defaults.set("num_threads", nthreads): with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: assert f.nbytes() == a.nbytes assert f"{http_server}/a" in str(f) @@ -187,7 +186,9 @@ def test_retry_http_503_fails(tmpdir, xp, capfd): a.tofile(tmpdir / "a") b = xp.empty_like(a) - with pytest.raises(RuntimeError) as m, kvikio.defaults.set_http_max_attempts(2): + with pytest.raises(RuntimeError) as m, kvikio.defaults.set( + "http_max_attempts", 2 + ): with kvikio.RemoteFile.open_http(f"{server.url}/a") as f: f.read(b) @@ -212,7 +213,7 @@ def test_no_retries_ok(tmpdir): ) as server: http_server = server.url b = np.empty_like(a) - with kvikio.defaults.set_http_max_attempts(1): + with kvikio.defaults.set("http_max_attempts", 1): with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: assert f.nbytes() == a.nbytes assert f"{http_server}/a" in str(f) @@ -227,9 +228,9 @@ def test_set_http_status_code(tmpdir): handler_options={"error_counter": ErrorCounter()}, ) as server: http_server = server.url - with kvikio.defaults.set_http_status_codes([429]): + with kvikio.defaults.set("http_status_codes", [429]): # this raises on the first 503 error, since it's not in the list. - assert kvikio.defaults.http_status_codes() == [429] + assert kvikio.defaults.get("http_status_codes") == [429] with pytest.raises(RuntimeError, match="503"): with kvikio.RemoteFile.open_http(f"{http_server}/a"): pass diff --git a/python/kvikio/tests/test_s3_io.py b/python/kvikio/tests/test_s3_io.py index 1f2bae95d0..45997b1e71 100644 --- a/python/kvikio/tests/test_s3_io.py +++ b/python/kvikio/tests/test_s3_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import multiprocessing as mp @@ -124,16 +124,20 @@ def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size): with s3_context( s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)} ) as server_address: - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - with kvikio.defaults.set_bounce_buffer_size(buffer_size): - with kvikio.RemoteFile.open_s3_url( - f"{server_address}/{bucket_name}/{object_name}" - ) as f: - assert f.nbytes() == a.nbytes - b = xp.empty_like(a) - assert f.read(buf=b) == a.nbytes - xp.testing.assert_array_equal(a, b) + with kvikio.defaults.set( + { + "num_threads": nthreads, + "task_size": tasksize, + "bounce_buffer_size": buffer_size, + } + ): + with kvikio.RemoteFile.open_s3_url( + f"{server_address}/{bucket_name}/{object_name}" + ) as f: + assert f.nbytes() == a.nbytes + b = xp.empty_like(a) + assert f.read(buf=b) == a.nbytes + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize(