Skip to content

Commit

Permalink
prov/cxi: synchronous fi_close on collective multicast
Browse files Browse the repository at this point in the history
The fi_close() operation manages its internal state and return FI_SUCCESS,
or a fatal error code on error.

Signed-off-by: Md Bulbul Sharif <md-bulbul.sharif@hpe.com>
  • Loading branch information
bulbul-hpe committed Nov 26, 2024
1 parent 2ad2e37 commit c208247
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion prov/cxi/src/cxip_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@

#define MAGIC 0x677d

/* Tracks the state of the fi_close() operation */
static int _coll_close_state;

/****************************************************************************
* Metrics for evaluating collectives
*/
Expand Down Expand Up @@ -2739,6 +2742,7 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
cxip_env.coll_fm_timeout_msec/1000,
(cxip_env.coll_fm_timeout_msec%1000)*1000000};

_coll_close_state = -FI_EAGAIN;
_tsset(&mc_obj->curlexpires, &expires);
_curl_delete_mc_obj(mc_obj);
} else
Expand All @@ -2752,11 +2756,26 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
static int _fi_close_mc(struct fid *fid)
{
struct cxip_coll_mc *mc_obj;
int ret = FI_SUCCESS;

TRACE_JOIN("%s: closing MC\n", __func__);
mc_obj = container_of(fid, struct cxip_coll_mc, mc_fid.fid);
_close_mc(mc_obj, true);
return FI_SUCCESS;

while ((ret = _coll_close_state) == -FI_EAGAIN) {
ret = cxip_curl_progress(NULL);
if (ret == -FI_EAGAIN) {
usleep(10);
continue;
}
if (ret < 0 && ret != -FI_ENODATA) {
TRACE_JOIN("%s: Curl progress failed, error=%d\n", __func__, ret);
break;
}
usleep(10);
}

return ret;
}

/* multicast object libfabric functions */
Expand Down Expand Up @@ -2992,6 +3011,9 @@ static int _initialize_mc(void *ptr)
TRACE_JOIN("%s: initialized mc[%d] to %p\n",
__func__, jstate->mynode_idx, *jstate->mc);

/* Initially set close state to success */
_coll_close_state = FI_SUCCESS;

return FI_SUCCESS;

fail:
Expand Down Expand Up @@ -3073,6 +3095,7 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj)
quit:
free(url);
if (ret < 0) {
_coll_close_state = ret;
TRACE_JOIN("CURL delete mcast %d failed\n",
mc_obj->mcast_addr);
free(curl_usrptr);
Expand Down Expand Up @@ -3100,6 +3123,7 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle)
switch (handle->status) {
case 200:
case 201:
_coll_close_state = FI_SUCCESS;
TRACE_JOIN("callback: %ld SUCCESS MCAST DELETED\n",
handle->status);
free(mc_obj);
Expand All @@ -3109,6 +3133,7 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle)
handle->status, errmsg);

if (_tsexp(&mc_obj->curlexpires)) {
_coll_close_state = FI_CXI_ERRNO_JOIN_CURL_TIMEOUT;
TRACE_JOIN("callback: FM expired\n");
free(mc_obj);
break;
Expand All @@ -3117,6 +3142,7 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle)
_curl_delete_mc_obj(mc_obj);
break;
default:
_coll_close_state = FI_CXI_ERRNO_JOIN_CURL_FAILED;
TRACE_JOIN("callback: %ld unknown status\n", handle->status);
free(mc_obj);
break;
Expand Down

0 comments on commit c208247

Please sign in to comment.