Skip to content

Commit

Permalink
prov/cxi: Support shared receive queues
Browse files Browse the repository at this point in the history
Restructure the code to allow for posting on the owner provider's shared
receive queues.

Signed-off-by: Amir Shehata <shehataa@ornl.gov>
  • Loading branch information
amirshehataornl committed Oct 28, 2024
1 parent f0a8a5f commit 615b325
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 147 deletions.
95 changes: 94 additions & 1 deletion prov/cxi/include/cxip.h
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,9 @@ struct cxip_domain {
ofi_spin_t lock;
ofi_atomic32_t ref;

struct fid_ep rx_ep;
struct fid_peer_srx *owner_srx;

uint32_t tclass;

struct cxip_eq *eq; //unused
Expand Down Expand Up @@ -1271,6 +1274,8 @@ struct cxip_req {
uint64_t trig_thresh;
struct cxip_cntr *trig_cntr;

struct fi_peer_rx_entry *rx_entry;

/* CQ event fields, set according to fi_cq.3
* - set by provider
* - returned to user in completion event
Expand Down Expand Up @@ -1444,6 +1449,8 @@ struct cxip_cntr {
struct cxip_ux_send {
struct dlist_entry rxc_entry;
struct cxip_req *req;
struct cxip_rxc *rxc;
struct fi_peer_rx_entry *rx_entry;
union c_event put_ev;
bool claimed; /* Reserved with FI_PEEK | FI_CLAIM */
};
Expand Down Expand Up @@ -2378,6 +2385,8 @@ struct cxip_ep_obj {
struct cxip_domain *domain;
struct cxip_av *av;

struct fid_peer_srx *owner_srx;

/* Domain has been configured with FI_AV_AUTH_KEY. */
bool av_auth_key;

Expand Down Expand Up @@ -3247,6 +3256,11 @@ double cxip_rep_sum(size_t count, double *values);
int cxip_check_auth_key_info(struct fi_info *info);
int cxip_gen_auth_key(struct fi_info *info, struct cxi_auth_key *key);

static inline struct fid_peer_srx *cxip_get_owner_srx(struct cxip_rxc *rxc)
{
return rxc->ep_obj->owner_srx;
}

#define CXIP_FC_SOFTWARE_INITIATED -1

/* cxip_fc_reason() - Returns the event reason for portal state
Expand Down Expand Up @@ -3291,6 +3305,13 @@ ssize_t cxip_rma_common(enum fi_op_type op, struct cxip_txc *txc,
struct cxip_cntr *trig_cntr,
struct cxip_cntr *comp_cntr);

static inline int cxip_no_discard(struct fi_peer_rx_entry *rx_entry)
{
return -FI_ENOSYS;
}

int cxip_unexp_start(struct fi_peer_rx_entry *entry);

/*
* Request variants:
* CXIP_RQ_AMO
Expand Down Expand Up @@ -3702,7 +3723,9 @@ int cxip_set_recv_match_id(struct cxip_rxc *rxc, fi_addr_t src_addr,
return FI_SUCCESS;
}

fi_addr_t cxip_recv_req_src_addr(struct cxip_req *req);
fi_addr_t cxip_recv_req_src_addr(struct cxip_rxc *rxc,
uint32_t init, uint16_t vni,
bool force);
int cxip_recv_req_alloc(struct cxip_rxc *rxc, void *buf, size_t len,
struct cxip_md *md, struct cxip_req **cxip_req,
int (*recv_cb)(struct cxip_req *req,
Expand Down Expand Up @@ -3754,4 +3777,74 @@ int cxip_domain_dwq_emit_amo(struct cxip_domain *dom, uint16_t vni,
struct c_dma_amo_cmd *amo, uint64_t flags,
bool fetching, bool flush);

static inline void cxip_set_env_rx_match_mode(void)
{
char *param_str = NULL;

fi_param_get_str(&cxip_prov, "rx_match_mode", &param_str);
/* Parameters to tailor hybrid hardware to software transitions
* that are initiated by software.
*/
fi_param_define(&cxip_prov, "hybrid_preemptive", FI_PARAM_BOOL,
"Enable/Disable low LE preemptive UX transitions.");
fi_param_get_bool(&cxip_prov, "hybrid_preemptive",
&cxip_env.hybrid_preemptive);
fi_param_define(&cxip_prov, "hybrid_recv_preemptive", FI_PARAM_BOOL,
"Enable/Disable low LE preemptive recv transitions.");
fi_param_get_bool(&cxip_prov, "hybrid_recv_preemptive",
&cxip_env.hybrid_recv_preemptive);
fi_param_define(&cxip_prov, "hybrid_unexpected_msg_preemptive",
FI_PARAM_BOOL,
"Enable preemptive transition to software endpoint when number of hardware unexpected messages exceeds RX attribute size");
fi_param_get_bool(&cxip_prov, "hybrid_unexpected_msg_preemptive",
&cxip_env.hybrid_unexpected_msg_preemptive);
fi_param_define(&cxip_prov, "hybrid_posted_recv_preemptive",
FI_PARAM_BOOL,
"Enable preemptive transition to software endpoint when number of posted receives exceeds RX attribute size");
fi_param_get_bool(&cxip_prov, "hybrid_posted_recv_preemptive",
&cxip_env.hybrid_posted_recv_preemptive);

if (param_str) {
if (!strcasecmp(param_str, "hardware")) {
cxip_env.rx_match_mode = CXIP_PTLTE_HARDWARE_MODE;
cxip_env.msg_offload = true;
} else if (!strcmp(param_str, "software")) {
cxip_env.rx_match_mode = CXIP_PTLTE_SOFTWARE_MODE;
cxip_env.msg_offload = false;
} else if (!strcmp(param_str, "hybrid")) {
cxip_env.rx_match_mode = CXIP_PTLTE_HYBRID_MODE;
cxip_env.msg_offload = true;
} else {
_CXIP_WARN(FI_LOG_FABRIC, "Unrecognized rx_match_mode: %s\n",
param_str);
cxip_env.rx_match_mode = CXIP_PTLTE_HARDWARE_MODE;
cxip_env.msg_offload = true;
}
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_preemptive) {
cxip_env.hybrid_preemptive = false;
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignoring preemptive\n");
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_recv_preemptive) {
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignore LE recv preemptive\n");
cxip_env.hybrid_recv_preemptive = 0;
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_posted_recv_preemptive) {
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignore hybrid_posted_recv_preemptive\n");
cxip_env.hybrid_posted_recv_preemptive = 0;
}

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_unexpected_msg_preemptive) {
_CXIP_WARN(FI_LOG_FABRIC, "Not in hybrid mode, ignore hybrid_unexpected_msg_preemptive\n");
cxip_env.hybrid_unexpected_msg_preemptive = 0;
}
}

#endif
17 changes: 17 additions & 0 deletions prov/cxi/src/cxip_av.c
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,26 @@ struct cxip_addr *(*cxip_av_addr_in)(const void *addr) = insert_in;
void (*cxip_av_addr_out)(struct cxip_addr *addr_out,
struct cxip_addr *addr) = insert_out;

static fi_addr_t cxip_get_addr(struct fi_peer_rx_entry *entry)
{
uint32_t ux_init;
uint16_t vni;
struct cxip_ux_send *ux = entry->peer_context;

ux_init = ux->put_ev.tgt_long.initiator.initiator.process;
vni = ux->put_ev.tgt_long.vni;

return cxip_recv_req_src_addr(ux->rxc, ux_init, vni, true);
}

static int cxip_av_insert(struct fid_av *fid, const void *addr_in, size_t count,
fi_addr_t *fi_addr, uint64_t flags, void *context)
{
struct cxip_av *av = container_of(fid, struct cxip_av, av_fid.fid);
size_t i;
size_t success_cnt = 0;
int ret;
struct fid_peer_srx *owner_srx;

ret = cxip_av_insert_validate_args(fid, addr_in, count, fi_addr, flags,
context);
Expand All @@ -253,6 +266,10 @@ static int cxip_av_insert(struct fid_av *fid, const void *addr_in, size_t count,

cxip_av_unlock(av);

owner_srx = av->domain->owner_srx;
if (owner_srx)
owner_srx->owner_ops->foreach_unspec_addr(owner_srx, &cxip_get_addr);

return success_cnt;
}

Expand Down
81 changes: 80 additions & 1 deletion prov/cxi/src/cxip_dom.c
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,85 @@ static int cxip_query_atomic(struct fid_domain *domain,
return FI_SUCCESS;
}

struct fi_ops_srx_peer cxip_srx_peer_ops = {
.size = sizeof(struct fi_ops_srx_peer),
.start_msg = cxip_unexp_start,
.start_tag = cxip_unexp_start,
.discard_msg = cxip_no_discard,
.discard_tag = cxip_no_discard,
};

static int cxip_srx_close(struct fid *fid)
{
struct cxip_domain *dom;

dom = container_of(fid, struct cxip_domain, rx_ep.fid);

ofi_atomic_dec32(&dom->util_domain.ref);

return FI_SUCCESS;
}

static struct fi_ops cxip_srx_fi_ops = {
.size = sizeof(struct fi_ops),
.close = cxip_srx_close,
.bind = fi_no_bind,
.control = fi_no_control,
.ops_open = fi_no_ops_open,
};

static struct fi_ops_msg cxip_srx_msg_ops = {
.size = sizeof(struct fi_ops_msg),
.recv = fi_no_msg_recv,
.recvv = fi_no_msg_recvv,
.recvmsg = fi_no_msg_recvmsg,
.send = fi_no_msg_send,
.sendv = fi_no_msg_sendv,
.sendmsg = fi_no_msg_sendmsg,
.inject = fi_no_msg_inject,
.senddata = fi_no_msg_senddata,
.injectdata = fi_no_msg_injectdata,
};

static struct fi_ops_tagged cxip_srx_tagged_ops = {
.size = sizeof(struct fi_ops_msg),
.recv = fi_no_tagged_recv,
.recvv = fi_no_tagged_recvv,
.recvmsg = fi_no_tagged_recvmsg,
.send = fi_no_tagged_send,
.sendv = fi_no_tagged_sendv,
.sendmsg = fi_no_tagged_sendmsg,
.inject = fi_no_tagged_inject,
.senddata = fi_no_tagged_senddata,
.injectdata = fi_no_tagged_injectdata,
};

static int cxip_srx_context(struct fid_domain *fid, struct fi_rx_attr *attr,
struct fid_ep **rx_ep, void *context)
{
struct cxip_domain *dom;

if (!context || ! attr || !fid)
return -FI_EINVAL;

dom = container_of(fid, struct cxip_domain,
util_domain.domain_fid.fid);

if (attr->op_flags & FI_PEER) {
dom->owner_srx = ((struct fi_peer_srx_context *) context)->srx;
dom->owner_srx->peer_ops = &cxip_srx_peer_ops;
dom->rx_ep.msg = &cxip_srx_msg_ops;
dom->rx_ep.tagged = &cxip_srx_tagged_ops;
dom->rx_ep.fid.ops = &cxip_srx_fi_ops;
dom->rx_ep.fid.fclass = FI_CLASS_SRX_CTX;
*rx_ep = &dom->rx_ep;
ofi_atomic_inc32(&dom->util_domain.ref);
return FI_SUCCESS;
}

return -FI_ENOSYS;
}

static int cxip_query_collective(struct fid_domain *domain,
enum fi_collective_op coll,
struct fi_collective_attr *attr,
Expand Down Expand Up @@ -1695,7 +1774,7 @@ static struct fi_ops_domain cxip_dom_ops = {
.cntr_open = cxip_cntr_open,
.poll_open = fi_no_poll_open,
.stx_ctx = fi_no_stx_context,
.srx_ctx = fi_no_srx_context,
.srx_ctx = cxip_srx_context,
.query_atomic = cxip_query_atomic,
.query_collective = cxip_query_collective
};
Expand Down
4 changes: 4 additions & 0 deletions prov/cxi/src/cxip_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,10 @@ int cxip_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)

break;

case FI_CLASS_SRX_CTX:
ep->ep_obj->owner_srx = ep->ep_obj->domain->owner_srx;
break;

default:
return -FI_EINVAL;
}
Expand Down
69 changes: 1 addition & 68 deletions prov/cxi/src/cxip_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,27 +836,8 @@ static void cxip_env_init(void)

fi_param_define(&cxip_prov, "rx_match_mode", FI_PARAM_STRING,
"Sets RX message match mode (hardware | software | hybrid).");
fi_param_get_str(&cxip_prov, "rx_match_mode", &param_str);

if (param_str) {
if (!strcasecmp(param_str, "hardware")) {
cxip_env.rx_match_mode = CXIP_PTLTE_HARDWARE_MODE;
cxip_env.msg_offload = true;
} else if (!strcmp(param_str, "software")) {
cxip_env.rx_match_mode = CXIP_PTLTE_SOFTWARE_MODE;
cxip_env.msg_offload = false;
} else if (!strcmp(param_str, "hybrid")) {
cxip_env.rx_match_mode = CXIP_PTLTE_HYBRID_MODE;
cxip_env.msg_offload = true;
} else {
CXIP_WARN("Unrecognized rx_match_mode: %s\n",
param_str);
cxip_env.rx_match_mode = CXIP_PTLTE_HARDWARE_MODE;
cxip_env.msg_offload = true;
}

param_str = NULL;
}
cxip_set_env_rx_match_mode();

fi_param_define(&cxip_prov, "rdzv_threshold", FI_PARAM_SIZE_T,
"Message size threshold for rendezvous protocol.");
Expand Down Expand Up @@ -1044,54 +1025,6 @@ static void cxip_env_init(void)
fi_param_get_size_t(&cxip_prov, "req_buf_max_cached",
&cxip_env.req_buf_max_cached);

/* Parameters to tailor hybrid hardware to software transitions
* that are initiated by software.
*/
fi_param_define(&cxip_prov, "hybrid_preemptive", FI_PARAM_BOOL,
"Enable/Disable low LE preemptive UX transitions.");
fi_param_get_bool(&cxip_prov, "hybrid_preemptive",
&cxip_env.hybrid_preemptive);
if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_preemptive) {
cxip_env.hybrid_preemptive = false;
CXIP_WARN("Not in hybrid mode, ignoring preemptive\n");
}

fi_param_define(&cxip_prov, "hybrid_recv_preemptive", FI_PARAM_BOOL,
"Enable/Disable low LE preemptive recv transitions.");
fi_param_get_bool(&cxip_prov, "hybrid_recv_preemptive",
&cxip_env.hybrid_recv_preemptive);

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_recv_preemptive) {
CXIP_WARN("Not in hybrid mode, ignore LE recv preemptive\n");
cxip_env.hybrid_recv_preemptive = 0;
}

fi_param_define(&cxip_prov, "hybrid_posted_recv_preemptive",
FI_PARAM_BOOL,
"Enable preemptive transition to software endpoint when number of posted receives exceeds RX attribute size");
fi_param_get_bool(&cxip_prov, "hybrid_posted_recv_preemptive",
&cxip_env.hybrid_posted_recv_preemptive);

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_posted_recv_preemptive) {
CXIP_WARN("Not in hybrid mode, ignore hybrid_posted_recv_preemptive\n");
cxip_env.hybrid_posted_recv_preemptive = 0;
}

fi_param_define(&cxip_prov, "hybrid_unexpected_msg_preemptive",
FI_PARAM_BOOL,
"Enable preemptive transition to software endpoint when number of hardware unexpected messages exceeds RX attribute size");
fi_param_get_bool(&cxip_prov, "hybrid_unexpected_msg_preemptive",
&cxip_env.hybrid_unexpected_msg_preemptive);

if (cxip_env.rx_match_mode != CXIP_PTLTE_HYBRID_MODE &&
cxip_env.hybrid_unexpected_msg_preemptive) {
CXIP_WARN("Not in hybrid mode, ignore hybrid_unexpected_msg_preemptive\n");
cxip_env.hybrid_unexpected_msg_preemptive = 0;
}

if (cxip_software_pte_allowed()) {
min_free = CXIP_REQ_BUF_HEADER_MAX_SIZE +
cxip_env.rdzv_threshold + cxip_env.rdzv_get_min;
Expand Down
Loading

0 comments on commit 615b325

Please sign in to comment.