Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/cxi: synchronous fi_close on collective multicast #10582

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions prov/cxi/include/cxip.h
Original file line number Diff line number Diff line change
Expand Up @@ -2957,6 +2957,9 @@ struct cxip_coll_mc {
int next_red_id; // next available red_id
int max_red_id; // limit total concurrency
int seqno; // rolling seqno for packets
int close_state; // the state of the close operation
bool has_closed; // true after a mc close call
bool has_error; // true if any error
bool is_multicast; // true if multicast address
bool arm_disable; // arm-disable for testing
bool retry_disable; // retry-disable for testing
Expand Down
81 changes: 70 additions & 11 deletions prov/cxi/src/cxip_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -2704,13 +2704,17 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj);
static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle);

/* Close multicast collective object */
static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete, bool has_error)
{
int count;

if (!mc_obj)
return;
TRACE_JOIN("%s starting MC cleanup\n", __func__);

mc_obj->has_closed = true;
mc_obj->has_error = has_error;

/* clear the mcast_addr -> mc_obj reference*/
ofi_idm_clear(&mc_obj->ep_obj->coll.mcast_map, mc_obj->mcast_addr);
mc_obj->ep_obj->coll.is_hwroot = false;
Expand Down Expand Up @@ -2739,10 +2743,18 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
cxip_env.coll_fm_timeout_msec/1000,
(cxip_env.coll_fm_timeout_msec%1000)*1000000};

if (!mc_obj->has_error)
mc_obj->close_state = -FI_EAGAIN;

_tsset(&mc_obj->curlexpires, &expires);
_curl_delete_mc_obj(mc_obj);
} else
free(mc_obj);
} else {
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_SUCCESS;
}
}
}

/* The user can close an individual collective MC address. It must do so on
Expand All @@ -2752,11 +2764,37 @@ static void _close_mc(struct cxip_coll_mc *mc_obj, bool delete)
static int _fi_close_mc(struct fid *fid)
{
struct cxip_coll_mc *mc_obj;
int ret = FI_SUCCESS;

TRACE_JOIN("%s: closing MC\n", __func__);
mc_obj = container_of(fid, struct cxip_coll_mc, mc_fid.fid);
_close_mc(mc_obj, true);
return FI_SUCCESS;
if (!mc_obj) {
TRACE_JOIN("%s: MC object is null\n", __func__);
return ret;
} else if (mc_obj->has_closed) {
TRACE_JOIN("%s: close already called before\n", __func__);
return ret;
} else if (mc_obj->has_error) {
TRACE_JOIN("%s: encounted an error earlier\n", __func__);
return ret;
}

_close_mc(mc_obj, true, false);
while (mc_obj && (ret = mc_obj->close_state) == -FI_EAGAIN) {
ret = cxip_curl_progress(NULL);
if (ret == -FI_EAGAIN) {
usleep(10);
continue;
}
if (ret < 0 && ret != -FI_ENODATA) {
TRACE_JOIN("%s: Curl progress failed, error=%d\n", __func__, ret);
break;
}
usleep(10);
}
free(mc_obj);

return ret;
}

/* multicast object libfabric functions */
Expand Down Expand Up @@ -2986,6 +3024,11 @@ static int _initialize_mc(void *ptr)
_coll_metrics.ep_data.isroot =
mc_obj->hwroot_idx == mc_obj->mynode_idx;

/* Initially set close states to success */
mc_obj->close_state = FI_SUCCESS;
mc_obj->has_closed = false;
mc_obj->has_error = false;

/* Return information to the caller */
jstate->mc_obj = mc_obj;
*jstate->mc = &mc_obj->mc_fid;
Expand All @@ -2996,7 +3039,7 @@ static int _initialize_mc(void *ptr)

fail:
jstate->prov_errno = FI_CXI_ERRNO_JOIN_FAIL_PTE;
_close_mc(mc_obj, true);
_close_mc(mc_obj, true, true);
return ret;
}

Expand Down Expand Up @@ -3076,7 +3119,11 @@ static void _curl_delete_mc_obj(struct cxip_coll_mc *mc_obj)
TRACE_JOIN("CURL delete mcast %d failed\n",
mc_obj->mcast_addr);
free(curl_usrptr);
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = ret;
}
}
}

Expand All @@ -3102,23 +3149,35 @@ static void _cxip_delete_mcast_cb(struct cxip_curl_handle *handle)
case 201:
TRACE_JOIN("callback: %ld SUCCESS MCAST DELETED\n",
handle->status);
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_SUCCESS;
}
break;
case 409:
TRACE_JOIN("callback: delete mcast failed: %ld '%s'\n",
handle->status, errmsg);

if (_tsexp(&mc_obj->curlexpires)) {
TRACE_JOIN("callback: FM expired\n");
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_CXI_ERRNO_JOIN_CURL_TIMEOUT;
}
break;
}
/* try again */
_curl_delete_mc_obj(mc_obj);
break;
default:
TRACE_JOIN("callback: %ld unknown status\n", handle->status);
free(mc_obj);
if (mc_obj->has_error) {
free(mc_obj);
} else {
mc_obj->close_state = FI_CXI_ERRNO_JOIN_CURL_FAILED;
}
break;
}
/* free json memory */
Expand Down Expand Up @@ -4209,7 +4268,7 @@ void cxip_coll_close(struct cxip_ep_obj *ep_obj)
while (!dlist_empty(&ep_obj->coll.mc_list)) {
dlist_pop_front(&ep_obj->coll.mc_list,
struct cxip_coll_mc, mc_obj, entry);
_close_mc(mc_obj, false);
_close_mc(mc_obj, false, true);
}
}

Expand Down