Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/opx: Fix Coverity scan waiting while holding lock errors. #9959

Merged
merged 1 commit into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 82 additions & 100 deletions prov/opx/include/opx_shm.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (c) 2016-2018 Intel Corporation. All rights reserved.
* Copyright (c) 2021-2023 Cornelis Networks.
* Copyright (c) 2021-2024 Cornelis Networks.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -59,15 +59,17 @@
#include <limits.h>
#include <signal.h>

#ifdef OPX_DAOS_SUPPORT
#ifdef OPX_DAOS
#define OPX_SHM_MAX_CONN_NUM 0xffff
#else
/* FI_OPX_MAX_HFIS * 256 */
#define OPX_SHM_MAX_CONN_NUM 0x1000
#define OPX_SHM_MAX_CONN_NUM (0x1000)
#define OPX_SHM_MAX_CONN_MASK (OPX_SHM_MAX_CONN_NUM - 1)
#endif
static_assert((OPX_SHM_MAX_CONN_NUM & OPX_SHM_MAX_CONN_MASK) == 0,
"OPX_SHM_MAX_CONN_NUM must be a power of 2!");

#define OPX_SHM_SEGMENT_NAME_MAX_LENGTH (512)
#define OPX_SHM_TX_CONNECT_MAX_WAIT (5000) // 5 seconds
#define OPX_SHM_SEGMENT_NAME_PREFIX "/opx.shm."
#define OPX_SHM_FILE_NAME_PREFIX_FORMAT "%s-%02hhX.%d"

Expand All @@ -80,14 +82,14 @@ struct opx_shm_connection {
void *segment_ptr;
size_t segment_size;
bool inuse;
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
};

struct opx_shm_tx {
struct opx_shm_tx *next; // for signal handler
struct fi_provider *prov;
struct opx_shm_fifo_segment *fifo_segment[OPX_SHM_MAX_CONN_NUM];
struct opx_shm_connection connection[OPX_SHM_MAX_CONN_NUM];
struct fi_provider *prov;
struct opx_shm_tx *next; // for signal handler
uint32_t rank;
uint32_t rank_inst;
};
Expand All @@ -98,12 +100,12 @@ struct opx_shm_resynch {
};

struct opx_shm_rx {
struct opx_shm_rx *next; // for signal handler
struct fi_provider *prov;
struct opx_shm_fifo_segment *fifo_segment;
void *segment_ptr;
size_t segment_size;
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
struct fi_provider *prov;
struct opx_shm_rx *next; // for signal handler
struct opx_shm_resynch resynch_connection[OPX_SHM_MAX_CONN_NUM];
};

Expand All @@ -112,42 +114,42 @@ extern struct opx_shm_rx *shm_rx_head;

struct opx_shm_packet
{
ofi_atomic64_t sequence_;
uint32_t origin_rank;
uint32_t origin_rank_inst;
ofi_atomic64_t sequence_;
uint32_t origin_rank;
uint32_t origin_rank_inst;

// TODO: Figure out why using pad_next_cacheline causes a segfault due to alignment w/ movaps instruction
// but the other one below does not, even though in both cases the struct size is the
// same, and data starts at a 16-byte aligned offset into the struct.
// TODO: Figure out why using pad_next_cacheline causes a segfault due to alignment w/ movaps instruction
// but the other one below does not, even though in both cases the struct size is the
// same, and data starts at a 16-byte aligned offset into the struct.

// sizeof(opx_shm_packet) == 8320, data starts at offset 0x40 (64)
// uint8_t pad_next_cacheline[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t) - sizeof(uint32_t) - sizeof(uint32_t)];
// sizeof(opx_shm_packet) == 8320, data starts at offset 0x40 (64)
// uint8_t pad_next_cacheline[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t) - sizeof(uint32_t) - sizeof(uint32_t)];

// sizeof(opx_shm_packet) == 8320, data starts at offset 0x20 (32)
uint64_t pad;
// sizeof(opx_shm_packet) == 8320, data starts at offset 0x20 (32)
uint64_t pad;

uint8_t data[FI_OPX_SHM_PACKET_SIZE];
uint8_t data[FI_OPX_SHM_PACKET_SIZE];
}__attribute__((__aligned__(64)));

struct opx_shm_fifo {
ofi_atomic64_t enqueue_pos_;
uint8_t pad0_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
ofi_atomic64_t dequeue_pos_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_packet buffer_[FI_OPX_SHM_FIFO_SIZE];
ofi_atomic64_t enqueue_pos_;
uint8_t pad0_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
ofi_atomic64_t dequeue_pos_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_packet buffer_[FI_OPX_SHM_FIFO_SIZE];
} __attribute__((__aligned__(64)));

static_assert((offsetof(struct opx_shm_fifo, enqueue_pos_) & 0x3fUL) == 0,
"struct opx_shm_fifo->enqueue_pos_ needs to be 64-byte aligned!");
"struct opx_shm_fifo->enqueue_pos_ needs to be 64-byte aligned!");
static_assert((offsetof(struct opx_shm_fifo, dequeue_pos_) & 0x3fUL) == 0,
"struct opx_shm_fifo->dequeue_pos_ needs to be 64-byte aligned!");
"struct opx_shm_fifo->dequeue_pos_ needs to be 64-byte aligned!");
static_assert(offsetof(struct opx_shm_fifo, buffer_) == (FI_OPX_CACHE_LINE_SIZE * 2),
"struct opx_shm_fifo->buffer_ should be 2 cachelines into struct");
"struct opx_shm_fifo->buffer_ should be 2 cachelines into struct");

struct opx_shm_fifo_segment {
ofi_atomic64_t initialized_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_fifo fifo;
ofi_atomic64_t initialized_;
uint8_t pad1_[FI_OPX_CACHE_LINE_SIZE - sizeof(ofi_atomic64_t)];
struct opx_shm_fifo fifo;
} __attribute__((__aligned__(64)));

static inline
Expand Down Expand Up @@ -232,11 +234,11 @@ ssize_t opx_shm_rx_init (struct opx_shm_rx *rx,
rx->segment_ptr = segment_ptr;
rx->segment_size = segment_size;

// TODO: MHEINZ we probably need a lock here.
rx->next = shm_rx_head; shm_rx_head = rx; // add to signal handler list.
rx->next = shm_rx_head;
shm_rx_head = rx; // add to signal handler list.

ofi_atomic_set64(&rx->fifo_segment->initialized_, 1);

close(segment_fd); /* safe to close now */

FI_LOG(prov, FI_LOG_INFO, FI_LOG_FABRIC,
Expand Down Expand Up @@ -282,8 +284,8 @@ ssize_t opx_shm_tx_init (struct opx_shm_tx *tx,
tx->rank = hfi_rank;
tx->rank_inst = hfi_rank_inst;

// TODO: MHEINZ we probably need a lock here.
tx->next = shm_tx_head; shm_tx_head = tx; // add to signal handler list.
tx->next = shm_tx_head;
shm_tx_head = tx; // add to signal handler list.

return FI_SUCCESS;
}
Expand All @@ -299,83 +301,57 @@ ssize_t opx_shm_tx_connect (struct opx_shm_tx *tx,
assert(segment_index < OPX_SHM_MAX_CONN_NUM);
int err = 0;

char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
memset(segment_key, 0, OPX_SHM_SEGMENT_NAME_MAX_LENGTH);
void *segment_ptr = tx->connection[segment_index].segment_ptr;
if (segment_ptr == NULL) {
char segment_key[OPX_SHM_SEGMENT_NAME_MAX_LENGTH];
snprintf(segment_key, OPX_SHM_SEGMENT_NAME_MAX_LENGTH,
OPX_SHM_SEGMENT_NAME_PREFIX "%s.%d",
unique_job_key, rx_id);

snprintf(segment_key, OPX_SHM_SEGMENT_NAME_MAX_LENGTH,
OPX_SHM_SEGMENT_NAME_PREFIX "%s.%d",
unique_job_key, rx_id);
int segment_fd = shm_open(segment_key, O_RDWR, 0600);
if (segment_fd == -1) {
FI_DBG(tx->prov, FI_LOG_FABRIC,
"Unable to create shm object '%s'; errno = '%s'\n",
segment_key, strerror(errno));
return -FI_EAGAIN;
}

if (segment_index >= OPX_SHM_MAX_CONN_NUM) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"Unable to create shm object '%s'; segment_index %u (rx %u) too large\n",
segment_key, segment_index, rx_id);
return -FI_E2BIG;
}
size_t segment_size = sizeof(struct opx_shm_fifo_segment) + 64;

int segment_fd;
unsigned loop = 0;
for (;;) {
segment_fd = shm_open(segment_key, O_RDWR, 0600);
if (segment_fd == -1) {
if (loop++ > OPX_SHM_TX_CONNECT_MAX_WAIT) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"Unable to create shm object '%s'; errno = '%s'\n",
segment_key, strerror(errno));
return -FI_EAGAIN;
}
usleep(1000);
} else {
break;
segment_ptr = mmap(NULL, segment_size, PROT_READ | PROT_WRITE,
MAP_SHARED, segment_fd, 0);
if (segment_ptr == MAP_FAILED) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"mmap failed: '%s'\n", strerror(errno));
err = errno;
goto error_return;
}
}

size_t segment_size = sizeof(struct opx_shm_fifo_segment) + 64;
close(segment_fd); /* safe to close now */

void *segment_ptr = mmap(NULL, segment_size, PROT_READ | PROT_WRITE,
MAP_SHARED, segment_fd, 0);
if (segment_ptr == MAP_FAILED) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"mmap failed: '%s'\n", strerror(errno));
err = errno;
goto error_return;
tx->connection[segment_index].segment_ptr = segment_ptr;
tx->connection[segment_index].segment_size = segment_size;
tx->connection[segment_index].inuse = false;
strcpy(tx->connection[segment_index].segment_key, segment_key);
}

close(segment_fd); /* safe to close now */

/*
* Wait for completion of the initialization of the SHM segment before using
* it.
*/
loop = 0;
struct opx_shm_fifo_segment *fifo_segment =
(struct opx_shm_fifo_segment *)(((uintptr_t)segment_ptr + 64) & (~0x03Full));
for (;;) {
uint64_t init =
atomic_load_explicit(&fifo_segment->initialized_.val, memory_order_acquire);

if (init == 0) {
if (loop++ > OPX_SHM_TX_CONNECT_MAX_WAIT) {
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"SHM object '%s' still initializing.\n",
segment_key);
return -FI_EAGAIN;
}
usleep(1000);
} else {
break;
}
uint64_t init = atomic_load_explicit(&fifo_segment->initialized_.val,
memory_order_acquire);
if (init == 0) {
FI_DBG(tx->prov, FI_LOG_FABRIC,
"SHM object '%s' still initializing.\n",
tx->connection[segment_index].segment_key);
return -FI_EAGAIN;
}

tx->connection[segment_index].segment_ptr = segment_ptr;
tx->connection[segment_index].segment_size = segment_size;
tx->connection[segment_index].inuse = false;
tx->fifo_segment[segment_index] = fifo_segment;
strcpy(tx->connection[segment_index].segment_key, segment_key);

FI_LOG(tx->prov, FI_LOG_INFO, FI_LOG_FABRIC,
"SHM connection to %u context passed. Segment (%s), %d, segment (%p) size %zu segment_index %u\n",
rx_id, segment_key, segment_fd, segment_ptr, segment_size, segment_index);
"SHM connection to %u context passed. Segment (%s), segment (%p) size %zu segment_index %u\n",
rx_id, tx->connection[segment_index].segment_key, segment_ptr,
tx->connection[segment_index].segment_size, segment_index);

return FI_SUCCESS;

Expand Down Expand Up @@ -436,19 +412,25 @@ static inline
void * opx_shm_tx_next (struct opx_shm_tx *tx, uint8_t peer_hfi_unit, uint8_t peer_rx_index,
uint64_t *pos, bool use_rank, unsigned rank, unsigned rank_inst, ssize_t *rc)
{
#ifdef OPX_DAOS
/* HFI Rank Support: Used HFI rank index instead of HFI index. */
unsigned segment_index = (!use_rank) ? OPX_SHM_SEGMENT_INDEX(peer_hfi_unit, peer_rx_index)
: opx_shm_daos_rank_index(rank, rank_inst);

#else
unsigned segment_index = OPX_SHM_SEGMENT_INDEX(peer_hfi_unit, peer_rx_index);
#endif
assert(segment_index < OPX_SHM_MAX_CONN_NUM);

#ifndef NDEBUG
if (segment_index >= OPX_SHM_MAX_CONN_NUM) {
*rc = -FI_EIO;
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"SHM %u context exceeds maximum contexts supported.\n", segment_index);
return NULL;
}
#endif

if (tx->fifo_segment[segment_index] == NULL) {
if (OFI_UNLIKELY(tx->fifo_segment[segment_index] == NULL)) {
*rc = -FI_EIO;
FI_LOG(tx->prov, FI_LOG_WARN, FI_LOG_FABRIC,
"SHM %u context FIFO not initialized.\n", segment_index);
Expand Down
Loading