Skip to content

Commit

Permalink
prov/efa: Minimize calls to efa_rdm_ep_get_peer in the CQ read path
Browse files Browse the repository at this point in the history
This change is useful for two reasons
(1) The efa_rdm_ep_get_peer function accesses the AV and it's easier to
lock access to the AV when the call happens in fewer places
(2) With an implicit AV, the fi_addr will always be -1 for peers not
explicitly inserted to the AV. So efa_rdm_ep_get_peer will need to do
a reverse hashmap lookup with a significant performance cost for such
peers. So minimizing the number of calls to efa_rdm_ep_get_peer is more
performant.

Signed-off-by: Sai Sunku <sunkusa@amazon.com>
  • Loading branch information
sunkuamzn committed Mar 5, 2025
1 parent 401a77c commit 12fb13e
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 95 deletions.
28 changes: 15 additions & 13 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ static void efa_rdm_cq_handle_recv_completion(struct efa_ibv_cq *ibv_cq, struct
return;
}

efa_rdm_pke_proc_received(pkt_entry);
efa_rdm_pke_proc_received(pkt_entry, peer);
}


Expand All @@ -371,6 +371,7 @@ static void efa_rdm_cq_handle_recv_completion(struct efa_ibv_cq *ibv_cq, struct
* provider error code.
*
* @param[in] ibv_cq_ex IBV CQ
* @param[in] peer efa_rdm_peer struct of sender
* @return EFA-specific error code
* @sa #EFA_PROV_ERRNOS
*
Expand All @@ -382,16 +383,10 @@ static void efa_rdm_cq_handle_recv_completion(struct efa_ibv_cq *ibv_cq, struct
* RDMA Core error codes (#EFA_IO_COMP_STATUSES) for the sake of more accurate
* error reporting
*/
static int efa_rdm_cq_get_prov_errno(struct ibv_cq_ex *ibv_cq_ex) {
static int efa_rdm_cq_get_prov_errno(struct ibv_cq_ex *ibv_cq_ex, struct efa_rdm_peer *peer) {
uint32_t vendor_err = ibv_wc_read_vendor_err(ibv_cq_ex);
struct efa_rdm_pke *pkt_entry = (void *) (uintptr_t) ibv_cq_ex->wr_id;
struct efa_rdm_peer *peer;
struct efa_rdm_ep *ep;

if (OFI_LIKELY(pkt_entry && pkt_entry->addr)) {
ep = pkt_entry->ep;
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
} else {
if (OFI_UNLIKELY(!peer)) {
return vendor_err;
}

Expand Down Expand Up @@ -438,6 +433,7 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
struct efa_cq *efa_cq;
struct efa_domain *efa_domain;
struct efa_qp *qp;
struct efa_rdm_peer *peer = NULL;
struct dlist_entry rx_progressed_ep_list, *tmp;

efa_cq = container_of(ibv_cq, struct efa_cq, ibv_cq);
Expand All @@ -462,12 +458,14 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
#endif
opcode = ibv_wc_read_opcode(ibv_cq->ibv_cq_ex);
if (ibv_cq->ibv_cq_ex->status) {
prov_errno = efa_rdm_cq_get_prov_errno(ibv_cq->ibv_cq_ex);
if (pkt_entry)
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
prov_errno = efa_rdm_cq_get_prov_errno(ibv_cq->ibv_cq_ex, peer);
switch (opcode) {
case IBV_WC_SEND: /* fall through */
case IBV_WC_RDMA_WRITE: /* fall through */
case IBV_WC_RDMA_READ:
efa_rdm_pke_handle_tx_error(pkt_entry, prov_errno);
efa_rdm_pke_handle_tx_error(pkt_entry, prov_errno, peer);
break;
case IBV_WC_RECV: /* fall through */
case IBV_WC_RECV_RDMA_WITH_IMM:
Expand All @@ -490,17 +488,21 @@ void efa_rdm_cq_poll_ibv_cq(ssize_t cqe_to_process, struct efa_ibv_cq *ibv_cq)
#if ENABLE_DEBUG
ep->send_comps++;
#endif
efa_rdm_pke_handle_send_completion(pkt_entry);
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
efa_rdm_pke_handle_send_completion(pkt_entry, peer);
break;
case IBV_WC_RECV:
/* efa_rdm_cq_handle_recv_completion does additional work to determine the source
* address and the peer struct. So do not try to identify the peer here. */
efa_rdm_cq_handle_recv_completion(ibv_cq, pkt_entry, ep);
#if ENABLE_DEBUG
ep->recv_comps++;
#endif
break;
case IBV_WC_RDMA_READ:
case IBV_WC_RDMA_WRITE:
efa_rdm_pke_handle_rma_completion(pkt_entry);
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
efa_rdm_pke_handle_rma_completion(pkt_entry, peer);
break;
case IBV_WC_RECV_RDMA_WITH_IMM:
efa_rdm_cq_proc_ibv_recv_rdma_with_imm_completion(
Expand Down
8 changes: 4 additions & 4 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep,

void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry);

void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry);
void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer);

static inline size_t efa_rdm_ep_get_rx_pool_size(struct efa_rdm_ep *ep)
{
Expand Down Expand Up @@ -235,9 +235,9 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe

struct efa_rdm_peer;

void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep,
struct dlist_entry *list,
struct efa_rdm_pke *pkt_entry);
void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct dlist_entry *list,
struct efa_rdm_pke *pkt_entry,
struct efa_rdm_peer *peer);

ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep,
struct dlist_entry *pkts);
Expand Down
11 changes: 5 additions & 6 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,11 @@ void efa_rdm_ep_record_tx_op_submitted(struct efa_rdm_ep *ep, struct efa_rdm_pke
* @param[in,out] ep endpoint
* @param[in] pkt_entry TX pkt_entry, which contains
* the info of the TX op
* @param[in] peer efa_rdm_peer struct for the receiver
*/
void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry)
void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer)
{
struct efa_rdm_ope *ope = NULL;
struct efa_rdm_peer *peer;

ope = pkt_entry->ope;
/*
Expand All @@ -351,7 +351,6 @@ void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke
* a new peer has the same GID+QPN was inserted to address, or because
* application removed the peer from address vector.
*/
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
if (peer)
dlist_remove(&pkt_entry->entry);

Expand Down Expand Up @@ -408,12 +407,13 @@ void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke
* @param[in] ep endpoint
* @param[in] list queued RNR packet list
* @param[in] pkt_entry packet entry that encounter RNR
* @param[in] peer efa_rdm_peer struct of the receiver
*/
void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep,
struct dlist_entry *list,
struct efa_rdm_pke *pkt_entry)
struct efa_rdm_pke *pkt_entry,
struct efa_rdm_peer *peer)
{
struct efa_rdm_peer *peer;
static const int random_min_timeout = 40;
static const int random_max_timeout = 120;

Expand All @@ -422,7 +422,6 @@ void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep,
#endif
dlist_insert_tail(&pkt_entry->entry, list);
ep->efa_rnr_queued_pkt_cnt += 1;
peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
assert(peer);
if (!(pkt_entry->flags & EFA_RDM_PKE_RNR_RETRANSMIT)) {
/* This is the first time this packet encountered RNR,
Expand Down
2 changes: 1 addition & 1 deletion prov/efa/src/rdm/efa_rdm_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ void efa_rdm_peer_proc_pending_items_in_robuf(struct efa_rdm_peer *peer, struct
EFA_DBG(FI_LOG_EP_CTRL,
"Processing msg_id %d from robuf\n", msg_id);
/* efa_rdm_pke_proc_rtm_rta will write error cq entry if needed */
ret = efa_rdm_pke_proc_rtm_rta(pending_pkt);
ret = efa_rdm_pke_proc_rtm_rta(pending_pkt, peer);
*ofi_recvwin_get_next_msg((&peer->robuf)) = NULL;

exp_msg_id = ofi_recvwin_next_exp_id((&peer->robuf));
Expand Down
29 changes: 15 additions & 14 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,10 @@ void efa_rdm_pke_handle_data_copied(struct efa_rdm_pke *pkt_entry)
*
* @param[in] pkt_entry pkt entry
* @param[in] prov_errno provider specific error code
* @param[in] peer efa_rdm_peer struct for the receiver
*/
void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, struct efa_rdm_peer *peer)
{
struct efa_rdm_peer *peer;
struct efa_rdm_ope *txe;
struct efa_rdm_ope *rxe;
struct efa_rdm_ep *ep;
Expand All @@ -412,9 +412,8 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
efa_strerror(prov_errno), prov_errno);

ep = pkt_entry->ep;
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer);

peer = efa_rdm_ep_get_peer(ep, pkt_entry->addr);
if (!peer) {
/*
* If peer is NULL, it means the peer has been removed from AV.
Expand Down Expand Up @@ -493,7 +492,7 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
* packets include all REQ, DATA) thus shoud be queued for RNR
* only if application wants EFA to manager resource.
*/
efa_rdm_ep_queue_rnr_pkt(ep, &txe->queued_pkts, pkt_entry);
efa_rdm_ep_queue_rnr_pkt(ep, &txe->queued_pkts, pkt_entry, peer);
if (!(txe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) {
txe->internal_flags |= EFA_RDM_OPE_QUEUED_RNR;
dlist_insert_tail(&txe->queued_entry,
Expand All @@ -514,7 +513,7 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
* is regardless value of ep->handle_resource_management, because
* resource management is only applied to send operation.
*/
efa_rdm_ep_queue_rnr_pkt(ep, &rxe->queued_pkts, pkt_entry);
efa_rdm_ep_queue_rnr_pkt(ep, &rxe->queued_pkts, pkt_entry, peer);
if (!(rxe->internal_flags & EFA_RDM_OPE_QUEUED_RNR)) {
rxe->internal_flags |= EFA_RDM_OPE_QUEUED_RNR;
dlist_insert_tail(&rxe->queued_entry,
Expand Down Expand Up @@ -544,8 +543,9 @@ void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno)
* then release the packet entry.
*
* @param[in,out] pkt_entry packet entry
* @param[in,out] peer efa_rdm_peer struct of the receiver
*/
void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer)
{
struct efa_rdm_ep *ep;

Expand All @@ -560,15 +560,15 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
if (pkt_entry->addr == FI_ADDR_NOTAVAIL &&
!(pkt_entry->flags & EFA_RDM_PKE_LOCAL_READ)) {
EFA_WARN(FI_LOG_CQ, "ignoring send completion of a packet to a removed peer.\n");
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer);
efa_rdm_pke_release_tx(pkt_entry);
return;
}

/* These pkts are eager pkts withour hdrs */
if (pkt_entry->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP) {
efa_rdm_pke_handle_eager_rtm_send_completion(pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer);
efa_rdm_pke_release_tx(pkt_entry);
return;
}
Expand All @@ -590,7 +590,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
efa_rdm_pke_handle_eor_send_completion(pkt_entry);
break;
case EFA_RDM_RMA_CONTEXT_PKT:
efa_rdm_pke_handle_rma_completion(pkt_entry);
efa_rdm_pke_handle_rma_completion(pkt_entry, peer);
return;
case EFA_RDM_ATOMRSP_PKT:
efa_rdm_pke_handle_atomrsp_send_completion(pkt_entry);
Expand Down Expand Up @@ -674,7 +674,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
return;
}

efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
efa_rdm_ep_record_tx_op_completed(ep, pkt_entry, peer);
efa_rdm_pke_release_tx(pkt_entry);
}

Expand Down Expand Up @@ -808,8 +808,9 @@ void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_im
*
* @param[in] ep endpoint
* @param[in] pkt_entry received packet entry
* @param[in] peer peer struct of the sender
*/
void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry)
void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer)
{
struct efa_rdm_ep *ep;
struct efa_rdm_base_hdr *base_hdr;
Expand Down Expand Up @@ -843,7 +844,7 @@ void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry)
efa_rdm_pke_handle_eor_recv(pkt_entry);
return;
case EFA_RDM_HANDSHAKE_PKT:
efa_rdm_pke_handle_handshake_recv(pkt_entry);
efa_rdm_pke_handle_handshake_recv(pkt_entry, peer);
return;
case EFA_RDM_CTS_PKT:
efa_rdm_pke_handle_cts_recv(pkt_entry);
Expand Down Expand Up @@ -880,7 +881,7 @@ void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry)
case EFA_RDM_DC_WRITE_RTA_PKT:
case EFA_RDM_FETCH_RTA_PKT:
case EFA_RDM_COMPARE_RTA_PKT:
efa_rdm_pke_handle_rtm_rta_recv(pkt_entry);
efa_rdm_pke_handle_rtm_rta_recv(pkt_entry, peer);
return;
case EFA_RDM_EAGER_RTW_PKT:
efa_rdm_pke_handle_eager_rtw_recv(pkt_entry);
Expand Down
6 changes: 3 additions & 3 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ fi_addr_t efa_rdm_pke_determine_addr(struct efa_rdm_pke *pkt_entry);

void efa_rdm_pke_handle_data_copied(struct efa_rdm_pke *pkt_entry);

void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno);
void efa_rdm_pke_handle_tx_error(struct efa_rdm_pke *pkt_entry, int prov_errno, struct efa_rdm_peer *peer);

void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry);
void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer);

void efa_rdm_pke_handle_rx_error(struct efa_rdm_pke *pkt_entry, int prov_errno);

void efa_rdm_pke_handle_recv_completion(struct efa_rdm_pke *pkt_entry);

void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry);
void efa_rdm_pke_proc_received(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer);

void efa_rdm_pke_proc_received_no_hdr(struct efa_rdm_pke *pkt_entry, bool has_imm_data, uint32_t imm_data);

Expand Down
9 changes: 4 additions & 5 deletions prov/efa/src/rdm/efa_rdm_pke_nonreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,15 @@ ssize_t efa_rdm_pke_init_handshake(struct efa_rdm_pke *pkt_entry,
return 0;
}

void efa_rdm_pke_handle_handshake_recv(struct efa_rdm_pke *pkt_entry)
void efa_rdm_pke_handle_handshake_recv(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer)
{
struct efa_rdm_peer *peer;
struct efa_rdm_handshake_hdr *handshake_pkt;
uint64_t *host_id_ptr;

assert(pkt_entry->addr != FI_ADDR_NOTAVAIL);
EFA_DBG(FI_LOG_CQ,
"HANDSHAKE received from %" PRIu64 "\n", pkt_entry->addr);

peer = efa_rdm_ep_get_peer(pkt_entry->ep, pkt_entry->addr);
assert(peer);

handshake_pkt = (struct efa_rdm_handshake_hdr *)pkt_entry->wiredata;
Expand Down Expand Up @@ -570,8 +568,9 @@ void efa_rdm_pke_handle_rma_read_completion(struct efa_rdm_pke *context_pkt_entr
*
* @param ep[in,out] Endpoint
* @param context_pkt_entry[in,out] The "Packet" which serves as context
* @param peer[in] struct efa_rdm_peer of peer
*/
void efa_rdm_pke_handle_rma_completion(struct efa_rdm_pke *context_pkt_entry)
void efa_rdm_pke_handle_rma_completion(struct efa_rdm_pke *context_pkt_entry, struct efa_rdm_peer *peer)
{
struct efa_rdm_ope *txe = NULL;
struct efa_rdm_rma_context_pkt *rma_context_pkt;
Expand Down Expand Up @@ -601,7 +600,7 @@ void efa_rdm_pke_handle_rma_completion(struct efa_rdm_pke *context_pkt_entry)
assert(0 && "invalid EFA_RDM_RMA_CONTEXT_PKT rma_context_type\n");
}

efa_rdm_ep_record_tx_op_completed(context_pkt_entry->ep, context_pkt_entry);
efa_rdm_ep_record_tx_op_completed(context_pkt_entry->ep, context_pkt_entry, peer);
efa_rdm_pke_release_tx(context_pkt_entry);
}

Expand Down
4 changes: 2 additions & 2 deletions prov/efa/src/rdm/efa_rdm_pke_nonreq.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ struct efa_rdm_handshake_opt_user_recv_qp_hdr *efa_rdm_pke_get_handshake_opt_use
ssize_t efa_rdm_pke_init_handshake(struct efa_rdm_pke *pkt_entry,
fi_addr_t addr);

void efa_rdm_pke_handle_handshake_recv(struct efa_rdm_pke *pkt_entry);
void efa_rdm_pke_handle_handshake_recv(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer);

/* CTS packet related functions */
static inline
Expand Down Expand Up @@ -210,7 +210,7 @@ void efa_rdm_pke_init_read_context(struct efa_rdm_pke *pkt_entry,
int read_id,
size_t seg_size);

void efa_rdm_pke_handle_rma_completion(struct efa_rdm_pke *pkt_entry);
void efa_rdm_pke_handle_rma_completion(struct efa_rdm_pke *pkt_entry, struct efa_rdm_peer *peer);

/* EOR packet related functions */
static inline
Expand Down
Loading

0 comments on commit 12fb13e

Please sign in to comment.