Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP: wireup slow connect-to-iface lanes on demand #10499

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ static ucs_config_field_t ucp_context_config_table[] = {
ucs_offsetof(ucp_context_config_t, max_priority_eps),
UCS_CONFIG_TYPE_UINT},

{"ON_DEMAND_WIREUP", "y", /* TODO: disable by default */
"Enable new protocol selection logic",
ucs_offsetof(ucp_context_config_t, on_demand_wireup), UCS_CONFIG_TYPE_BOOL},

{NULL}
};

Expand Down
2 changes: 2 additions & 0 deletions src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ typedef struct ucp_context_config {
uint64_t extra_op_attr_flags;
/* Upper limit to the amount of prioritized endpoints */
unsigned max_priority_eps;
/** On demand lanes wireup */
int on_demand_wireup;
} ucp_context_config_t;


Expand Down
10 changes: 6 additions & 4 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ void ucp_ep_config_key_reset(ucp_ep_config_key_t *key)
key->lanes[i].path_index = 0;
key->lanes[i].lane_types = 0;
key->lanes[i].seg_size = 0;
key->lanes[i].addr_index = UINT_MAX;
}
key->am_lane = UCP_NULL_LANE;
key->wireup_msg_lane = UCP_NULL_LANE;
Expand Down Expand Up @@ -1270,7 +1271,7 @@ ucp_ep_purge_lanes(ucp_ep_h ep, uct_pending_purge_callback_t purge_cb,
uct_ep_h uct_ep;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
if ((lane == ucp_ep_get_cm_lane(ep)) || (uct_ep == NULL)) {
continue;
}
Expand All @@ -1297,7 +1298,7 @@ static void ucp_ep_check_lanes(ucp_ep_h ep)
uct_ep_h uct_ep;

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
if ((uct_ep != NULL) && ucp_is_uct_ep_failed(uct_ep)) {
num_failed_tl_ep++;
}
Expand All @@ -1320,7 +1321,7 @@ ucp_ep_set_lanes_failed(ucp_ep_h ep, uct_ep_h *uct_eps, uct_ep_h failed_ep)
ucp_ep_update_flags(ep, UCP_EP_FLAG_FAILED, UCP_EP_FLAG_LOCAL_CONNECTED);

for (lane = 0; lane < ucp_ep_num_lanes(ep); ++lane) {
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
uct_eps[lane] = uct_ep;

/* Set UCT EP to failed UCT EP to make sure if UCP EP won't be destroyed
Expand Down Expand Up @@ -1947,7 +1948,8 @@ static int ucp_ep_config_lane_is_equal(const ucp_ep_config_key_t *key1,
(config_lane1->dst_md_index == config_lane2->dst_md_index) &&
(config_lane1->dst_sys_dev == config_lane2->dst_sys_dev) &&
(config_lane1->lane_types == config_lane2->lane_types) &&
(config_lane1->seg_size == config_lane2->seg_size);
(config_lane1->seg_size == config_lane2->seg_size) &&
(config_lane1->addr_index == config_lane2->addr_index);
}

int ucp_ep_config_is_equal(const ucp_ep_config_key_t *key1,
Expand Down
4 changes: 4 additions & 0 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ typedef struct ucp_ep_config_key_lane {
was selected for */
size_t seg_size; /* Maximal fragment size which can be
received by the peer */
unsigned addr_index; /* Address index in the remote address,
NOTE: can be removed if remote side
sends only requested address for on
demand wireup request */
} ucp_ep_config_key_lane_t;


Expand Down
44 changes: 39 additions & 5 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static UCS_F_ALWAYS_INLINE uct_ep_h ucp_ep_get_fast_lane(ucp_ep_h ep,
}

static UCS_F_ALWAYS_INLINE uct_ep_h
ucp_ep_get_lane(ucp_ep_h ep, ucp_lane_index_t lane_index)
ucp_ep_get_lane_raw(ucp_ep_h ep, ucp_lane_index_t lane_index)
{
ucs_assertv(lane_index < UCP_MAX_LANES, "lane=%d", lane_index);

Expand All @@ -41,6 +41,26 @@ ucp_ep_get_lane(ucp_ep_h ep, ucp_lane_index_t lane_index)
}
}

static UCS_F_ALWAYS_INLINE uct_ep_h
ucp_ep_get_lane(ucp_ep_h ep, ucp_lane_index_t lane_index)
{
uct_ep_h lane;

/* fast-path lane must be initialized */
if (ucs_likely(lane_index < UCP_MAX_FAST_PATH_LANES)) {
lane = ep->uct_eps[lane_index];
ucs_assert(lane != NULL);
return lane;
}

lane = ucp_ep_get_lane_raw(ep, lane_index);
if (ucs_likely(lane != NULL)) {
return lane;
}

return ucp_wireup_init_slow_lane(ep, lane_index - UCP_MAX_FAST_PATH_LANES);
}

static UCS_F_ALWAYS_INLINE void ucp_ep_set_lane(ucp_ep_h ep, size_t lane_index,
uct_ep_h uct_ep)
{
Expand Down Expand Up @@ -137,6 +157,18 @@ static inline ucp_lane_index_t ucp_ep_num_lanes(ucp_ep_h ep)
return ucp_ep_config(ep)->key.num_lanes;
}

static inline ucp_lane_index_t ucp_ep_num_valid_lanes(ucp_ep_h ep)
{
ucp_lane_index_t num_lanes;
ucp_lane_index_t i;

for (num_lanes = 0, i = 0; i < ucp_ep_num_lanes(ep); ++i) {
num_lanes += (ucp_ep_get_lane_raw(ep, i) != NULL);
}

return num_lanes;
}

static inline int ucp_ep_is_lane_p2p(ucp_ep_h ep, ucp_lane_index_t lane)
{
return !!(ucp_ep_config(ep)->p2p_lanes & UCS_BIT(lane));
Expand Down Expand Up @@ -213,10 +245,12 @@ static inline void ucp_ep_update_remote_id(ucp_ep_h ep,
ucs_ptr_map_key_t remote_id)
{
if (ep->flags & UCP_EP_FLAG_REMOTE_ID) {
ucs_assertv(remote_id == ep->ext->remote_ep_id,
"ep=%p flags=0x%" PRIx32 " rkey=0x%" PRIxPTR
" ep->remote_id=0x%" PRIxPTR, ep, ep->flags, remote_id,
ep->ext->remote_ep_id);
// FIXME: enable this assert after fixing the issue with
// temporary EP on addr reply
// ucs_assertv(remote_id == ep->ext->remote_ep_id,
// "ep=%p flags=0x%" PRIx32 " rkey=0x%" PRIxPTR
// " ep->remote_id=0x%" PRIxPTR, ep, ep->flags, remote_id,
// ep->ext->remote_ep_id);
}

ucs_assert(remote_id != UCS_PTR_MAP_KEY_INVALID);
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_proxy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ void ucp_proxy_ep_replace(ucp_proxy_ep_t *proxy_ep)

ucs_assert(proxy_ep->uct_ep != NULL);
for (lane = 0; lane < ucp_ep_num_lanes(ucp_ep); ++lane) {
if (ucp_ep_get_lane(ucp_ep, lane) == &proxy_ep->super) {
if (ucp_ep_get_lane_raw(ucp_ep, lane) == &proxy_ep->super) {
ucs_assert(proxy_ep->uct_ep != NULL); /* make sure there is only one match */
ucp_ep_set_lane(ucp_ep, lane, proxy_ep->uct_ep);
proxy_ep->uct_ep = NULL;
Expand Down
11 changes: 6 additions & 5 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ static int ucp_ep_flush_is_completed(ucp_request_t *req)
static void ucp_ep_flush_progress(ucp_request_t *req)
{
ucp_ep_h ep = req->send.ep;
unsigned num_lanes = ucp_ep_num_lanes(ep);
ucp_lane_map_t all_lanes = UCS_MASK(num_lanes);
unsigned num_lanes = ucp_ep_num_valid_lanes(ep);
ucp_lane_map_t all_lanes = UCS_MASK(num_lanes); // TODO: check lanes ordering
ucp_ep_flush_state_t *flush_state;
ucp_lane_index_t lane;
ucs_status_t status;
Expand Down Expand Up @@ -109,7 +109,7 @@ static void ucp_ep_flush_progress(ucp_request_t *req)

/* Search for next lane to start flush */
lane = ucs_ffs64(all_lanes & ~req->send.flush.started_lanes);
uct_ep = ucp_ep_get_lane(ep, lane);
uct_ep = ucp_ep_get_lane_raw(ep, lane);
if (uct_ep == NULL) {
ucp_ep_flush_request_update_uct_comp(req, -1, UCS_BIT(lane));
continue;
Expand Down Expand Up @@ -380,6 +380,7 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
ucp_request_callback_t flushed_cb,
const char *debug_name)
{
ucp_lane_index_t num_lanes = ucp_ep_num_valid_lanes(ep);
ucs_status_t status;
ucp_request_t *req;

Expand All @@ -405,12 +406,12 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
UCT_FLUSH_FLAG_LOCAL;
req->send.flush.sw_started = 0;
req->send.flush.sw_done = 0;
req->send.flush.num_lanes = ucp_ep_num_lanes(ep);
req->send.flush.num_lanes = num_lanes;
req->send.flush.started_lanes = 0;
req->send.lane = UCP_NULL_LANE;
req->send.uct.func = ucp_ep_flush_progress_pending;
req->send.state.uct_comp.func = ucp_ep_flush_completion;
req->send.state.uct_comp.count = ucp_ep_num_lanes(ep);
req->send.state.uct_comp.count = num_lanes;
req->send.state.uct_comp.status = UCS_OK;

ucp_request_set_super(req, worker_req);
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -2369,14 +2369,15 @@ ucp_wireup_construct_lanes(const ucp_wireup_select_params_t *select_params,
*/
for (lane = 0; lane < key->num_lanes; ++lane) {
ucs_assert(select_ctx->lane_descs[lane].lane_types != 0);
addr_indices[lane] = select_ctx->lane_descs[lane].addr_index;
key->lanes[lane].rsc_index = select_ctx->lane_descs[lane].rsc_index;
key->lanes[lane].dst_md_index = select_ctx->lane_descs[lane].dst_md_index;
key->lanes[lane].dst_sys_dev = select_ctx->lane_descs[lane].dst_sys_dev;
key->lanes[lane].lane_types = select_ctx->lane_descs[lane].lane_types;
key->lanes[lane].seg_size = select_ctx->lane_descs[lane].seg_size;
key->lanes[lane].path_index = ucp_wireup_default_path_index(
select_ctx->lane_descs[lane].path_index);
key->lanes[lane].addr_index = select_ctx->lane_descs[lane].addr_index;
addr_indices[lane] = key->lanes[lane].addr_index;

if (select_ctx->lane_descs[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_CM)) {
ucs_assert(key->cm_lane == UCP_NULL_LANE);
Expand Down
Loading
Loading