Skip to content

Commit

Permalink
fabtests/efa: Add fabtests for efa-direct
Browse files Browse the repository at this point in the history
Run fabtests for efa-direct up to device max msg size(8K) where
FI_RMA is supported.
RMA tests currently post a recv with the max transfer size. It
could run up to max rdma size after fixing fabtests to post the
recv within device max msg size.
Also skip 0 byte for rma because fabtests use inject for messages
smaller than inject_size, but efa-direct does not support it
until firmware supports inline write.

Signed-off-by: Jessie Yang <jiaxiyan@amazon.com>
  • Loading branch information
jiaxiyan committed Feb 24, 2025
1 parent 5163e6a commit 8f18de1
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 71 deletions.
24 changes: 22 additions & 2 deletions fabtests/pytest/efa/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,31 @@ def zcpy_recv_message_size(request):
def zcpy_recv_max_msg_size(request):
return 8192

# TODO - add efa-direct tests
@pytest.fixture(scope="module", params=["efa"])
@pytest.fixture(scope="module", params=["r:0,4,32",
"r:0,1024,8192",])
def direct_message_size(request):
return request.param

# TODO: Include 0 byte test when we support 0 byte rma inject
@pytest.fixture(scope="module", params=["r:1,4,32",
"r:1,1024,8192",])
def direct_rma_size(request):
return request.param

@pytest.fixture(scope="module", params=["efa", "efa-direct"])
def fabric(request):
return request.param

@pytest.fixture(scope="function")
def rma_fabric(cmdline_args, fabric):
if fabric == 'efa-direct' and (
not has_rdma(cmdline_args, 'read') or
not has_rdma(cmdline_args, 'write') or
not has_rdma(cmdline_args, 'writedata')
):
pytest.skip("FI_RMA is not supported. Skip rma tests on efa-direct.")
return fabric

@pytest.hookimpl(hookwrapper=True)
def pytest_collection_modifyitems(session, config, items):
# Called after collection has been performed, may filter or re-order the items in-place
Expand Down
5 changes: 3 additions & 2 deletions fabtests/pytest/efa/test_av.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest

# This test skips efa-direct because it requests FI_TAGGED
@pytest.mark.functional
def test_av_xfer(cmdline_args, fabric):
def test_av_xfer(cmdline_args):
from common import ClientServerTest
test = ClientServerTest(cmdline_args, "fi_av_xfer -e rdm", fabric=fabric)
test = ClientServerTest(cmdline_args, "fi_av_xfer -e rdm", fabric="efa")
test.run()
5 changes: 4 additions & 1 deletion fabtests/pytest/efa/test_cq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from efa.efa_common import has_rdma

# this test must be run in serial mode because it will open the maximal number
# of cq that efa device can support
Expand All @@ -13,5 +14,7 @@ def test_cq(cmdline_args, fabric):
@pytest.mark.parametrize("operation_type", ["senddata", "writedata"])
def test_cq_data(cmdline_args, operation_type, fabric):
from common import ClientServerTest
test = ClientServerTest(cmdline_args, f"fi_cq_data -e rdm -o" + operation_type, fabric=fabric)
if fabric == "efa-direct" and operation_type == "writedata" and not has_rdma(cmdline_args, operation_type):
pytest.skip("FI_RMA is not supported. Skip writedata test on efa-direct.")
test = ClientServerTest(cmdline_args, f"fi_cq_data -e rdm -o " + operation_type, fabric=fabric)
test.run()
2 changes: 2 additions & 0 deletions fabtests/pytest/efa/test_efa_device_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def test_efa_device_selection(cmdline_args, fabric):
client_device_name = client_device_names[client_device_idx]

for suffix in ["rdm", "dgrm"]:
if fabric == "efa-direct" and suffix == "dgrm":
continue
server_tx_bytes_before_test = efa_retrieve_hw_counter_value(cmdline_args.server_id, "tx_bytes", server_device_name)
client_tx_bytes_before_test = efa_retrieve_hw_counter_value(cmdline_args.client_id, "tx_bytes", client_device_name)

Expand Down
5 changes: 3 additions & 2 deletions fabtests/pytest/efa/test_efa_protocol_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from efa.efa_common import has_gdrcopy, has_rdma


# This test skips efa-direct because it does not have the read protocol
# TODO Expand this test to run on all memory types (and rename)
@pytest.mark.serial
@pytest.mark.functional
@pytest.mark.cuda_memory
@pytest.mark.parametrize("fabtest_name,cntrl_env_var", [("fi_rdm_tagged_bw", "FI_EFA_INTER_MIN_READ_MESSAGE_SIZE"), ("fi_rma_bw", "FI_EFA_INTER_MIN_READ_WRITE_SIZE")])
def test_transfer_with_read_protocol_cuda(cmdline_args, fabtest_name, cntrl_env_var, fabric):
def test_transfer_with_read_protocol_cuda(cmdline_args, fabtest_name, cntrl_env_var):
"""
Verify that the read protocol is used for a 1024 byte message when the env variable
switches are set to force the read protocol at 1000 bytes.
Expand Down Expand Up @@ -51,7 +52,7 @@ def test_transfer_with_read_protocol_cuda(cmdline_args, fabtest_name, cntrl_env_
memory_type="cuda_to_cuda",
message_size=message_size,
warmup_iteration_type="0",
fabric=fabric)
fabric="efa")

server_read_wrs_after_test = efa_retrieve_hw_counter_value(cmdline_args.server_id, "rdma_read_wrs")
server_read_bytes_after_test = efa_retrieve_hw_counter_value(cmdline_args.server_id, "rdma_read_bytes")
Expand Down
3 changes: 1 addition & 2 deletions fabtests/pytest/efa/test_fork_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ def test_fork_support(cmdline_args, completion_semantic, environment_variable, f
cmdline_args_copy = copy.copy(cmdline_args)

cmdline_args_copy.append_environ("{}=1".format(environment_variable))
test = ClientServerTest(cmdline_args_copy, "fi_rdm_tagged_bw -K",
test = ClientServerTest(cmdline_args_copy, "fi_rdm_bw -K",
completion_semantic=completion_semantic,
datacheck_type="with_datacheck", fabric=fabric)
test.run()

5 changes: 3 additions & 2 deletions fabtests/pytest/efa/test_multi_ep.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

@pytest.mark.functional
@pytest.mark.parametrize("shared_cq", [True, False])
def test_multi_ep(cmdline_args, shared_cq, fabric):
def test_multi_ep(cmdline_args, shared_cq, rma_fabric):
# This test requests FI_RMA
from common import ClientServerTest
cmd = "fi_multi_ep -e rdm"
if shared_cq:
cmd += " -Q"
test = ClientServerTest(cmdline_args, cmd, fabric=fabric)
test = ClientServerTest(cmdline_args, cmd, message_size=256, fabric=rma_fabric)
test.run()
5 changes: 3 additions & 2 deletions fabtests/pytest/efa/test_multi_recv.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
[pytest.param("short", marks=pytest.mark.short),
pytest.param("standard", marks=pytest.mark.standard)])
@pytest.mark.parametrize("message_size", ["1024", "8192"])
def test_multi_recv(cmdline_args, iteration_type, message_size, fabric):
# efa-direct does not support multi-recv
def test_multi_recv(cmdline_args, iteration_type, message_size):
from common import ClientServerTest
test = ClientServerTest(cmdline_args,
"fi_multi_recv -e rdm",
iteration_type,
message_size=message_size,
fabric=fabric)
fabric="efa")
test.run()
78 changes: 44 additions & 34 deletions fabtests/pytest/efa/test_rdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,92 +11,101 @@ def test_rdm_efa(cmdline_args, completion_semantic, fabric):
test = ClientServerTest(cmdline_args, "fi_rdm", completion_semantic=completion_semantic, fabric=fabric)
test.run()

# This test skips efa-direct because it requests FI_ORDER_SAS
@pytest.mark.functional
def test_rdm_bw_functional_efa(cmdline_args, completion_semantic, fabric):
def test_rdm_bw_functional_efa(cmdline_args, completion_semantic):
from common import ClientServerTest
test = ClientServerTest(cmdline_args, "fi_flood -e rdm -v -T 1", completion_semantic=completion_semantic, fabric=fabric)
test = ClientServerTest(cmdline_args, "fi_flood -e rdm -v -T 1", completion_semantic=completion_semantic, fabric="efa")
test.run()

@pytest.mark.parametrize("iteration_type",
[pytest.param("short", marks=pytest.mark.short),
pytest.param("standard", marks=pytest.mark.standard)])
def test_rdm_pingpong(cmdline_args, iteration_type, completion_semantic, memory_type_bi_dir, completion_type, fabric):
def test_rdm_pingpong(cmdline_args, iteration_type, completion_semantic,
memory_type_bi_dir, completion_type, direct_message_size, fabric):
command = "fi_rdm_pingpong" + " " + perf_progress_model_cli
efa_run_client_server_test(cmdline_args, command, iteration_type,
completion_semantic, memory_type_bi_dir, "all",
completion_semantic, memory_type_bi_dir,
direct_message_size if fabric == "efa-direct" else "all",
completion_type=completion_type, fabric=fabric)

# This test skips efa-direct because efa-direct does not
# do memory registrations on behalf of the application
@pytest.mark.functional
@pytest.mark.serial
def test_mr_exhaustion_rdm_pingpong(cmdline_args, completion_semantic, fabric):
def test_mr_exhaustion_rdm_pingpong(cmdline_args, completion_semantic):
efa_run_client_server_test(cmdline_args, "fi_efa_exhaust_mr_reg_rdm_pingpong", "short",
completion_semantic, "host_to_host", "all", timeout=1000,
fabric=fabric)
fabric="efa")

@pytest.mark.functional
def test_rdm_pingpong_range(cmdline_args, completion_semantic, memory_type_bi_dir, message_size, fabric):
def test_rdm_pingpong_range(cmdline_args, completion_semantic, memory_type_bi_dir, message_size, direct_message_size, fabric):
efa_run_client_server_test(cmdline_args, "fi_rdm_pingpong", "short",
completion_semantic, memory_type_bi_dir, message_size, fabric=fabric)
completion_semantic, memory_type_bi_dir,
direct_message_size if fabric == "efa-direct" else message_size, fabric=fabric)

@pytest.mark.functional
def test_rdm_pingpong_no_inject_range(cmdline_args, completion_semantic, inject_message_size, fabric):
def test_rdm_pingpong_no_inject_range(cmdline_args, completion_semantic, inject_message_size, direct_message_size, fabric):
efa_run_client_server_test(cmdline_args, "fi_rdm_pingpong -j 0", "short",
completion_semantic, "host_to_host", inject_message_size, fabric=fabric)
completion_semantic, "host_to_host",
direct_message_size if fabric == "efa-direct" else inject_message_size, fabric=fabric)

# efa-direct does not support tagged
@pytest.mark.parametrize("iteration_type",
[pytest.param("short", marks=pytest.mark.short),
pytest.param("standard", marks=pytest.mark.standard)])
def test_rdm_tagged_pingpong(cmdline_args, iteration_type, completion_semantic, memory_type_bi_dir, completion_type, fabric):
def test_rdm_tagged_pingpong(cmdline_args, iteration_type, completion_semantic, memory_type_bi_dir, completion_type):
command = "fi_rdm_tagged_pingpong" + " " + perf_progress_model_cli
efa_run_client_server_test(cmdline_args, command, iteration_type,
completion_semantic, memory_type_bi_dir, "all", completion_type=completion_type,
fabric=fabric)
fabric="efa")

@pytest.mark.functional
def test_rdm_tagged_pingpong_range(cmdline_args, completion_semantic, memory_type_bi_dir, message_size, fabric):
def test_rdm_tagged_pingpong_range(cmdline_args, completion_semantic, memory_type_bi_dir, message_size):
efa_run_client_server_test(cmdline_args, "fi_rdm_tagged_pingpong", "short",
completion_semantic, memory_type_bi_dir, message_size,
fabric=fabric)
fabric="efa")

@pytest.mark.parametrize("iteration_type",
[pytest.param("short", marks=pytest.mark.short),
pytest.param("standard", marks=pytest.mark.standard)])
def test_rdm_tagged_bw(cmdline_args, iteration_type, completion_semantic, memory_type, completion_type, fabric):
def test_rdm_tagged_bw(cmdline_args, iteration_type, completion_semantic, memory_type, completion_type):
command = "fi_rdm_tagged_bw" + " " + perf_progress_model_cli
efa_run_client_server_test(cmdline_args, command, iteration_type,
completion_semantic, memory_type, "all", completion_type=completion_type,
fabric=fabric)
fabric="efa")

@pytest.mark.functional
def test_rdm_tagged_bw_range(cmdline_args, completion_semantic, memory_type, message_size, fabric):
def test_rdm_tagged_bw_range(cmdline_args, completion_semantic, memory_type, message_size):
efa_run_client_server_test(cmdline_args, "fi_rdm_tagged_bw", "short",
completion_semantic, memory_type, message_size, fabric=fabric)
completion_semantic, memory_type, message_size, fabric="efa")

@pytest.mark.functional
def test_rdm_tagged_bw_no_inject_range(cmdline_args, completion_semantic, inject_message_size, fabric):
def test_rdm_tagged_bw_no_inject_range(cmdline_args, completion_semantic, inject_message_size):
efa_run_client_server_test(cmdline_args, "fi_rdm_tagged_bw -j 0", "short",
completion_semantic, "host_to_host", inject_message_size, fabric=fabric)
completion_semantic, "host_to_host", inject_message_size, fabric="efa")

@pytest.mark.functional
@pytest.mark.parametrize("env_vars", [["FI_EFA_TX_SIZE=64"], ["FI_EFA_RX_SIZE=64"], ["FI_EFA_TX_SIZE=64", "FI_EFA_RX_SIZE=64"]])
def test_rdm_tagged_bw_small_tx_rx(cmdline_args, completion_semantic, memory_type, completion_type, env_vars, fabric):
def test_rdm_tagged_bw_small_tx_rx(cmdline_args, completion_semantic, memory_type, completion_type, env_vars):
cmdline_args_copy = copy.copy(cmdline_args)
for env_var in env_vars:
cmdline_args_copy.append_environ(env_var)
# Use a window size larger than tx/rx size
efa_run_client_server_test(cmdline_args_copy, "fi_rdm_tagged_bw -W 128", "short",
completion_semantic, memory_type, "all", completion_type=completion_type,
fabric=fabric)
fabric="efa")

@pytest.mark.functional
def test_rdm_tagged_bw_use_fi_more(cmdline_args, completion_semantic, memory_type, message_size, fabric):
def test_rdm_tagged_bw_use_fi_more(cmdline_args, completion_semantic, memory_type, message_size):
efa_run_client_server_test(cmdline_args, "fi_rdm_tagged_bw --use-fi-more",
"short", completion_semantic, memory_type, message_size, fabric=fabric)
"short", completion_semantic, memory_type, message_size, fabric="efa")

# efa-direct does not support atomic
@pytest.mark.parametrize("iteration_type",
[pytest.param("short", marks=pytest.mark.short),
pytest.param("standard", marks=pytest.mark.standard)])
def test_rdm_atomic(cmdline_args, iteration_type, completion_semantic, memory_type, fabric):
def test_rdm_atomic(cmdline_args, iteration_type, completion_semantic, memory_type):
from copy import copy

from common import ClientServerTest
Expand All @@ -110,7 +119,7 @@ def test_rdm_atomic(cmdline_args, iteration_type, completion_semantic, memory_ty
cmdline_args_copy = copy(cmdline_args)
command = "fi_rdm_atomic" + " " + perf_progress_model_cli
test = ClientServerTest(cmdline_args_copy, "fi_rdm_atomic", iteration_type, completion_semantic,
memory_type=memory_type, timeout=1800, fabric=fabric)
memory_type=memory_type, timeout=1800, fabric="efa")
test.run()

@pytest.mark.functional
Expand All @@ -123,38 +132,39 @@ def test_rdm_tagged_peek(cmdline_args):
test.run()

# This test is run in serial mode because it takes a lot of memory
# It is skipped for efa-direct because 1GB exceeds device max msg size
@pytest.mark.serial
@pytest.mark.functional
def test_rdm_pingpong_1G(cmdline_args, completion_semantic, fabric):
def test_rdm_pingpong_1G(cmdline_args, completion_semantic):
# Default window size is 64 resulting in 128GB being registered, which
# exceeds max number of registered host pages
efa_run_client_server_test(cmdline_args, "fi_rdm_pingpong -W 1", 2,
completion_semantic=completion_semantic, message_size=1073741824,
memory_type="host_to_host", warmup_iteration_type=0, fabric=fabric)
memory_type="host_to_host", warmup_iteration_type=0, fabric="efa")

@pytest.mark.functional
def test_rdm_pingpong_zcpy_recv(cmdline_args, memory_type_bi_dir, zcpy_recv_max_msg_size, zcpy_recv_message_size, fabric):
def test_rdm_pingpong_zcpy_recv(cmdline_args, memory_type_bi_dir, zcpy_recv_max_msg_size, zcpy_recv_message_size):
if cmdline_args.server_id == cmdline_args.client_id:
pytest.skip("no zero copy recv for intra-node communication")
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
efa_run_client_server_test(cmdline_args_copy, f"fi_rdm_pingpong --max-msg-size {zcpy_recv_max_msg_size}",
"short", "transmit_complete", memory_type_bi_dir, zcpy_recv_message_size, fabric=fabric)
"short", "transmit_complete", memory_type_bi_dir, zcpy_recv_message_size, fabric="efa")

@pytest.mark.functional
def test_rdm_bw_zcpy_recv(cmdline_args, memory_type, zcpy_recv_max_msg_size, zcpy_recv_message_size, fabric):
def test_rdm_bw_zcpy_recv(cmdline_args, memory_type, zcpy_recv_max_msg_size, zcpy_recv_message_size):
if cmdline_args.server_id == cmdline_args.client_id:
pytest.skip("no zero copy recv for intra-node communication")
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
efa_run_client_server_test(cmdline_args_copy, f"fi_rdm_bw --max-msg-size {zcpy_recv_max_msg_size}",
"short", "transmit_complete", memory_type, zcpy_recv_message_size, fabric=fabric)
"short", "transmit_complete", memory_type, zcpy_recv_message_size, fabric="efa")

@pytest.mark.functional
def test_rdm_bw_zcpy_recv_use_fi_more(cmdline_args, memory_type, zcpy_recv_max_msg_size, zcpy_recv_message_size, fabric):
def test_rdm_bw_zcpy_recv_use_fi_more(cmdline_args, memory_type, zcpy_recv_max_msg_size, zcpy_recv_message_size):
if cmdline_args.server_id == cmdline_args.client_id:
pytest.skip("no zero copy recv for intra-node communication")
cmdline_args_copy = copy.copy(cmdline_args)
cmdline_args_copy.append_environ("FI_EFA_ENABLE_SHM_TRANSFER=0")
efa_run_client_server_test(cmdline_args_copy, f"fi_rdm_bw --use-fi-more --max-msg-size {zcpy_recv_max_msg_size}",
"short", "transmit_complete", memory_type, zcpy_recv_message_size, fabric=fabric)
"short", "transmit_complete", memory_type, zcpy_recv_message_size, fabric="efa")
Loading

0 comments on commit 8f18de1

Please sign in to comment.