Skip to content

Commit

Permalink
prov/cxi: Add FI_WAIT_YIELD EQ support
Browse files Browse the repository at this point in the history
Add provider specific support for EQ with FI_WAIT_YIELD
attribute. FI_WAIT_FD is not supported at this time.

Signed-off-by: Steve Welch <welch@hpe.com>
  • Loading branch information
swelch authored and iziemba committed Feb 5, 2025
1 parent 5e53634 commit 9fbd8b1
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 19 deletions.
97 changes: 85 additions & 12 deletions prov/cxi/src/cxip_eq.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,59 @@ ssize_t cxip_eq_read(struct fid_eq *eq_fid, uint32_t *event,
return ret;
}

static ssize_t cxip_eq_sread(struct fid_eq *eq_fid, uint32_t *event, void *buf,
size_t len, int timeout, uint64_t flags)
{
struct cxip_eq *eq = container_of(eq_fid, struct cxip_eq,
util_eq.eq_fid);
uint64_t endtime;
ssize_t ret;

if (eq->attr.wait_obj == FI_WAIT_NONE)
return -FI_ENOSYS;

assert(eq->attr.wait_obj == FI_WAIT_YIELD);

endtime = ofi_timeout_time(timeout);
do {
/* Read initiates EQ progress if empty */
ret = cxip_eq_read(eq_fid, event, buf, len, flags);
if (ret != -FI_EAGAIN)
break;

/* On timeout return -FI_EAGAIN */
if (ofi_adjust_timeout(endtime, &timeout))
return -FI_EAGAIN;

sched_yield();
} while (ret == -FI_EAGAIN);

return ret;
}

static int cxip_eq_control(struct fid *fid, int command, void *arg)
{
struct cxip_eq *eq = container_of(fid, struct cxip_eq, util_eq.eq_fid);
int ret;

switch (command) {
case FI_GETWAITOBJ:
*(enum fi_wait_obj *) arg = eq->attr.wait_obj;
ret = FI_SUCCESS;
break;
default:
CXIP_WARN("Unsupported EQ control command: %d\n", command);
ret = -FI_ENOSYS;
};

return ret;
}

static struct fi_ops_eq cxi_eq_ops = {
.size = sizeof(struct fi_ops_eq),
.read = cxip_eq_read, // customized
.readerr = ofi_eq_readerr,
.sread = ofi_eq_sread,
.sread = cxip_eq_sread, // customized
.write = ofi_eq_write,
.strerror = cxip_eq_strerror, // customized
};
Expand All @@ -99,7 +147,7 @@ static struct fi_ops cxi_eq_fi_ops = {
.size = sizeof(struct fi_ops),
.close = cxip_eq_close, // customized
.bind = fi_no_bind,
.control = ofi_eq_control,
.control = cxip_eq_control, // customized
.ops_open = fi_no_ops_open,
};

Expand All @@ -111,12 +159,43 @@ static struct fi_eq_attr cxip_eq_def_attr = {
.wait_set = NULL
};

static int cxip_eq_verify_attr(struct fi_eq_attr *attr)
{
if (!attr)
return FI_SUCCESS;

/* Applications should set wait_obj == FI_WAIT_NONE for best
* performance. If a wait_obj is required, only FI_WAIT_YIELD
* is supported. This is due to collectives not currently
* exposing the next internal timeout duration.
*/
switch (attr->wait_obj) {
case FI_WAIT_UNSPEC:
attr->wait_obj = FI_WAIT_YIELD;
break;
case FI_WAIT_NONE:
case FI_WAIT_YIELD:
break;
default:
CXIP_WARN("Unsupported EQ wait object: %d\n",
attr->wait_obj);
return -FI_ENOSYS;
}

return FI_SUCCESS;
}

int cxip_eq_open(struct fid_fabric *fabric, struct fi_eq_attr *attr,
struct fid_eq **eq, void *context)
{
struct cxip_eq *cxi_eq;
struct fi_eq_attr temp_attr;
int ret;

ret = cxip_eq_verify_attr(attr);
if (ret != FI_SUCCESS)
return ret;

cxi_eq = calloc(1, sizeof(*cxi_eq));
if (!cxi_eq)
return -FI_ENOMEM;
Expand All @@ -126,16 +205,10 @@ int cxip_eq_open(struct fid_fabric *fabric, struct fi_eq_attr *attr,
else
cxi_eq->attr = *attr;

if (cxi_eq->attr.wait_obj != FI_WAIT_NONE) {
CXIP_WARN("Unsupported EQ attribute wait obj %d\n",
cxi_eq->attr.wait_obj);
ret = -FI_ENOSYS;

goto err0;
}

ret = ofi_eq_init(fabric, &cxi_eq->attr, &cxi_eq->util_eq.eq_fid,
context);
/* CXI does not use common code internal wait object */
temp_attr = cxi_eq->attr;
temp_attr.wait_obj = FI_WAIT_NONE;
ret = ofi_eq_init(fabric, &temp_attr, &cxi_eq->util_eq.eq_fid, context);
if (ret != FI_SUCCESS)
goto err0;

Expand Down
51 changes: 51 additions & 0 deletions prov/cxi/test/cxip_test_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ void cxit_create_eq(void)
cr_assert_not_null(cxit_eq, "fi_eq_open returned NULL eq");
}

void cxit_create_eq_yield(void)
{
struct fi_eq_attr attr = {
.size = 32,
.flags = FI_WRITE,
.wait_obj = FI_WAIT_YIELD
};
int ret;

ret = fi_eq_open(cxit_fabric, &attr, &cxit_eq, NULL);
cr_assert(ret == FI_SUCCESS, "fi_eq_open FI_WAIT_YIELD failed %d", ret);
cr_assert_not_null(cxit_eq, "fi_eq_open returned NULL eq");
}

void cxit_destroy_eq(void)
{
int ret;
Expand Down Expand Up @@ -798,6 +812,43 @@ void cxit_setup_enabled_ep_fd(void)
cr_assert(addrlen == sizeof(cxit_ep_addr));
}

void cxit_setup_enabled_ep_eq_yield(void)
{
int ret;
size_t addrlen = sizeof(cxit_ep_addr);

cxit_setup_getinfo();

cxit_tx_cq_attr.format = FI_CQ_FORMAT_TAGGED;
cxit_rx_cq_attr.format = FI_CQ_FORMAT_TAGGED;
cxit_av_attr.type = FI_AV_TABLE;

cxit_fi_hints->domain_attr->data_progress = FI_PROGRESS_MANUAL;
cxit_fi_hints->domain_attr->data_progress = FI_PROGRESS_MANUAL;
cxit_fi_hints->domain_attr->threading = FI_THREAD_SAFE;

cxit_setup_ep();

/* Set up RMA objects */
cxit_create_ep();
cxit_create_eq_yield();
cxit_bind_eq();
cxit_create_cqs();
cxit_bind_cqs();
cxit_create_cntrs();
cxit_bind_cntrs();
cxit_create_av();
cxit_bind_av();

ret = fi_enable(cxit_ep);
cr_assert(ret == FI_SUCCESS, "ret is: %d\n", ret);

/* Find assigned Endpoint address. Address is assigned during enable. */
ret = fi_getname(&cxit_ep->fid, &cxit_ep_addr, &addrlen);
cr_assert(ret == FI_SUCCESS, "ret is %d\n", ret);
cr_assert(addrlen == sizeof(cxit_ep_addr));
}

void cxit_setup_rma_disable_fi_rma_event(void)
{
int ret;
Expand Down
1 change: 1 addition & 0 deletions prov/cxi/test/cxip_test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ void cxit_teardown_ep(void);
#define cxit_teardown_av cxit_teardown_ep
void cxit_setup_enabled_ep(void);
void cxit_setup_enabled_ep_fd(void);
void cxit_setup_enabled_ep_eq_yield(void);
void cxit_setup_rma(void);
void cxit_setup_rma_fd(void);
void cxit_setup_rma_hybrid_mr_desc(void);
Expand Down
112 changes: 105 additions & 7 deletions prov/cxi/test/eq.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ Test(eq, simple)
cxit_destroy_eq();
}

void eq_bad_wait_obj(enum fi_wait_obj wait_obj)

void setup_eq_wait_obj(enum fi_wait_obj wait_obj, bool pass)
{
struct fi_eq_attr attr = {
.size = 32,
Expand All @@ -46,17 +45,116 @@ void eq_bad_wait_obj(enum fi_wait_obj wait_obj)
int ret;

ret = fi_eq_open(cxit_fabric, &attr, &cxit_eq, NULL);
cr_assert(ret == -FI_ENOSYS, "fi_eq_open unexpected success");
cr_assert(cxit_eq == NULL, "cxit_eq not NULL on bad wait_obj");
if (pass) {
cr_assert(ret == FI_SUCCESS,
"fi_eq_open wait_obj %d, unexpected err %d",
wait_obj, ret);
cr_assert(cxit_eq != NULL, "cxit_eq NULL on good wait_obj");
fi_close(&cxit_eq->fid);
} else {
cr_assert(ret == -FI_ENOSYS,
"fi_eq_open wait_obj %d, unexpected success %d",
wait_obj, ret);
cr_assert(cxit_eq == NULL, "cxit_eq not NULL on bad wait_obj");
}
}

Test(eq, good_wait_obj_none)
{
setup_eq_wait_obj(FI_WAIT_NONE, true);
}

Test(eq, bad_wait_obj_unspec)
Test(eq, good_wait_obj_unspec)
{
eq_bad_wait_obj(FI_WAIT_UNSPEC);
setup_eq_wait_obj(FI_WAIT_UNSPEC, true);
}

Test(eq, good_wait_obj_wait_yield)
{
setup_eq_wait_obj(FI_WAIT_YIELD, true);
}

Test(eq, bad_wait_obj_wait_fd)
{
eq_bad_wait_obj(FI_WAIT_UNSPEC);
setup_eq_wait_obj(FI_WAIT_FD, false);
}

Test(eq, bad_wait_obj_wait_set)
{
setup_eq_wait_obj(FI_WAIT_SET, false);
}

TestSuite(eq_wait, .init = cxit_setup_enabled_ep_eq_yield,
.fini = cxit_teardown_enabled_ep, .timeout = CXIT_DEFAULT_TIMEOUT);

Test(eq_wait, timeout)
{
uint64_t end_ms;
uint64_t start_ms = ofi_gettime_ms();
struct fi_eq_err_entry eqe = {};
uint32_t event;
int ret;
int timeout = 200;

ret = fi_eq_sread(cxit_eq, &event, &eqe, sizeof(eqe), timeout, 0);
end_ms = ofi_gettime_ms();
cr_assert(ret == -FI_EAGAIN, "Unexpected return value %s",
fi_strerror(-ret));
cr_assert(end_ms >= start_ms + timeout,
"Timeout too short %ld ms asked for %d ms",
end_ms - start_ms, timeout);
}

struct eq_worker_data {
uint32_t event;
void *context;
uint64_t data;
};

static void *eq_worker(void *data)
{
struct eq_worker_data *args = (struct eq_worker_data *) data;
struct fi_eq_err_entry eqe = {};
uint32_t event;
ssize_t ret;
int timeout = 2000;

ret = fi_eq_sread(cxit_eq, &event, &eqe, sizeof(eqe), timeout, 0);
cr_assert(ret >= 0, "Unexpected EQ read failure %s", fi_strerror(-ret));
cr_assert(args->event == event, "Unexpected EQ event %d", event);
cr_assert(args->context == eqe.context, "Unexpected EQ context %p",
eqe.context);
cr_assert(args->data == eqe.data, "Unexpected EQ data %ld", eqe.data);

pthread_exit(NULL);
}

Test(eq_wait, yield_write)
{
struct fi_eq_entry entry = {
.context = (void *) 0x1,
.data = 1ULL << 63
};
struct eq_worker_data parms = {
.event = FI_JOIN_COMPLETE,
.context = entry.context,
.data = entry.data
};
pthread_t eq_read_thread;
pthread_attr_t attr = {};
int ret;

pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

ret = pthread_create(&eq_read_thread, &attr, eq_worker, (void *)&parms);
cr_assert(ret == 0, "Unexpected pthread_create error %d", ret);

/* Make sure worker in fi_cq_sread() */
sleep(1);

ret = fi_eq_write(cxit_eq, FI_JOIN_COMPLETE, &entry, sizeof(entry), 0);
cr_assert(ret == sizeof(entry), "Bad return for eq_write %d", ret);

pthread_join(eq_read_thread, NULL);
}

0 comments on commit 9fbd8b1

Please sign in to comment.