Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/tcp: drop stale messages for new endpoint #10783

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/rdma/fabric.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ enum {
FI_TAG_BITS,
FI_TAG_MPI,
FI_TAG_CCL,
FI_TAG_RPC,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this change to a separate PR that also updates to the man page which describes what exactly this tag format implies. A separate PR is preferred, as this is an update to the API which needs broader review than just the tcp provider.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#10792 <- I moved the tag definition in this PR

FI_TAG_MAX_FORMAT = (1ULL << 16),
};

Expand Down
11 changes: 11 additions & 0 deletions man/fi_endpoint.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,17 @@ wire protocols. The following tag formats are defined:
Applications that use the CCL format pass in the payload identifier
directly as the tag and set ignore bits to 0.

*FI_TAG_RPC*

: The FI_TAG_RPC flag is used to indicate that tags are being utilized to match
RPC requests and replies. When specified via fi_getinfo, the caller ensures that
a reply buffer with the corresponding tag is registered when sending a request.

This mechanism enables libfabric to identify and discard stale replies, preventing
them from interfering with new communications. This is crucial to avoid blocking
a restarting endpoint, which may otherwise lack sufficient metadata to process
incoming messages with unmatched tags.

*FI_TAG_MAX_FORMAT*
: If the value of mem_tag_format is >= FI_TAG_MAX_FORMAT, the tag format
is treated as a set of bit fields. The behavior is functionally the same
Expand Down
2 changes: 2 additions & 0 deletions prov/tcp/src/xnet.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ struct xnet_ep {
void (*hdr_bswap)(struct xnet_ep *ep, struct xnet_base_hdr *hdr);

short pollflags;
bool tagged_rpc;

xnet_profile_t *profile;
};
Expand Down Expand Up @@ -428,6 +429,7 @@ static inline void xnet_signal_progress(struct xnet_progress *progress)
#define XNET_COPY_RECV BIT(9)
#define XNET_CLAIM_RECV BIT(10)
#define XNET_NEED_CTS BIT(11)
#define XNET_UNEXP_XFER BIT(12)
#define XNET_MULTI_RECV FI_MULTI_RECV /* BIT(16) */

struct xnet_mrecv {
Expand Down
2 changes: 1 addition & 1 deletion prov/tcp/src/xnet_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void xnet_report_success(struct xnet_xfer_entry *xfer_entry)
uint64_t flags, data, tag;
size_t len;

if (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_SAVED_XFER))
if (xfer_entry->ctrl_flags & (XNET_INTERNAL_XFER | XNET_SAVED_XFER | XNET_UNEXP_XFER))
return;

if (xfer_entry->cntr)
Expand Down
1 change: 1 addition & 0 deletions prov/tcp/src/xnet_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ xnet_ep_accept(struct fid_ep *ep_fid, const void *param, size_t paramlen)
(paramlen > XNET_MAX_CM_DATA_SIZE))
return -FI_EINVAL;

ep->tagged_rpc = conn->pep->info->ep_attr->mem_tag_format == FI_TAG_RPC;
ep->conn = NULL;

assert(ep->cm_msg);
Expand Down
12 changes: 10 additions & 2 deletions prov/tcp/src/xnet_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,16 @@ static int xnet_getinfo(uint32_t version, const char *node, const char *service,
uint64_t flags, const struct fi_info *hints,
struct fi_info **info)
{
return ofi_ip_getinfo(&xnet_util_prov, version, node, service, flags,
hints, info);
int ret;

ret = ofi_ip_getinfo(&xnet_util_prov, version, node, service, flags, hints, info);
if (ret)
return ret;

if (hints->ep_attr && hints->ep_attr->mem_tag_format && (*info)->ep_attr)
(*info)->ep_attr->mem_tag_format = hints->ep_attr->mem_tag_format;

return 0;
}

struct xnet_port_range xnet_ports = {
Expand Down
48 changes: 47 additions & 1 deletion prov/tcp/src/xnet_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,44 @@ static bool xnet_save_and_cont(struct xnet_ep *ep)
return (ep->saved_msg->cnt < xnet_max_saved);
}

static struct xnet_xfer_entry *
xnet_get_unexp_rx(struct xnet_ep *ep, uint64_t tag)
{
struct xnet_progress *progress;
struct xnet_xfer_entry *rx_entry;

progress = xnet_ep2_progress(ep);
assert(xnet_progress_locked(progress));
assert(ep->cur_rx.hdr_done == ep->cur_rx.hdr_len &&
!ep->cur_rx.claim_ctx);

FI_DBG(&xnet_prov, FI_LOG_EP_DATA, "Unexp msg tag 0x%zx src %zu\n",
tag, ep->peer->fi_addr);
rx_entry = xnet_alloc_xfer(xnet_srx2_progress(ep->srx));
if (!rx_entry)
return NULL;

rx_entry->saving_ep = NULL;
rx_entry->tag = tag;
rx_entry->ignore = 0;
rx_entry->ctrl_flags = XNET_UNEXP_XFER;

if (ep->cur_rx.data_left <= xnet_buf_size) {
rx_entry->user_buf = NULL;
rx_entry->iov[0].iov_base = &rx_entry->msg_data;
rx_entry->iov[0].iov_len = xnet_buf_size;
rx_entry->iov_cnt = 1;
} else if (xnet_alloc_xfer_buf(rx_entry, ep->cur_rx.data_left)) {
goto free_xfer;
}

return rx_entry;

free_xfer:
xnet_free_xfer(progress, rx_entry);
return NULL;
}

static struct xnet_xfer_entry *
xnet_get_save_rx(struct xnet_ep *ep, uint64_t tag)
{
Expand Down Expand Up @@ -822,6 +860,14 @@ static int xnet_handle_tag(struct xnet_ep *ep)
if (rx_entry)
return xnet_start_recv(ep, rx_entry);
}

if (ep->tagged_rpc) {
/* receive and discard this unexpected message for tagged rpc */
rx_entry = xnet_get_unexp_rx(ep, tag);
if (rx_entry)
return xnet_start_recv(ep, rx_entry);
}

if (dlist_empty(&ep->unexp_entry)) {
dlist_insert_tail(&ep->unexp_entry,
&xnet_ep2_progress(ep)->unexp_tag_list);
Expand Down Expand Up @@ -1102,7 +1148,7 @@ static void xnet_complete_rx(struct xnet_ep *ep, ssize_t ret)
goto cq_error;
}

if (!(rx_entry->ctrl_flags & XNET_SAVED_XFER)) {
if (!(rx_entry->ctrl_flags & XNET_SAVED_XFER) || (rx_entry->ctrl_flags & XNET_UNEXP_XFER)) {
xnet_report_success(rx_entry);
xnet_free_xfer(xnet_ep2_progress(ep), rx_entry);
} else {
Expand Down
2 changes: 2 additions & 0 deletions prov/tcp/src/xnet_rdm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,8 @@ static int xnet_init_rdm(struct xnet_rdm *rdm, struct fi_info *info)
msg_info->tx_attr->op_flags = info->tx_attr->op_flags;
msg_info->rx_attr->caps &= info->rx_attr->caps;
msg_info->rx_attr->op_flags = info->rx_attr->op_flags;
if (info->ep_attr)
msg_info->ep_attr->mem_tag_format = info->ep_attr->mem_tag_format;

ret = fi_srx_context(&rdm->util_ep.domain->domain_fid, info->rx_attr,
&srx, rdm);
Expand Down
Loading