diff --git a/src/coll_score/ucc_coll_score.c b/src/coll_score/ucc_coll_score.c index 7cc4f90af3..c99d33f9dc 100644 --- a/src/coll_score/ucc_coll_score.c +++ b/src/coll_score/ucc_coll_score.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -9,6 +9,16 @@ #include "utils/ucc_log.h" #include "utils/ucc_coll_utils.h" +char *ucc_score_to_str(ucc_score_t score, char *buf, size_t max) { + if (score == UCC_SCORE_MAX) { + ucc_strncpy_safe(buf, "inf", max); + } else { + ucc_snprintf_safe(buf, max, "%d", score); + } + + return buf; +} + ucc_status_t ucc_coll_score_alloc(ucc_coll_score_t **score) { ucc_coll_score_t *s = ucc_malloc(sizeof(*s), "ucc_coll_score"); diff --git a/src/coll_score/ucc_coll_score.h b/src/coll_score/ucc_coll_score.h index 16f0ba0b74..fa95e6a76a 100644 --- a/src/coll_score/ucc_coll_score.h +++ b/src/coll_score/ucc_coll_score.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -63,6 +63,8 @@ typedef struct ucc_coll_score { typedef struct ucc_score_map ucc_score_map_t; +char *ucc_score_to_str(ucc_score_t score, char *buf, size_t max); + /* Allocates empty score data structure */ ucc_status_t ucc_coll_score_alloc(ucc_coll_score_t **score); @@ -77,7 +79,7 @@ ucc_status_t ucc_coll_score_add_range(ucc_coll_score_t *score, /* Releases the score data structure and all the score ranges stored there */ -void ucc_coll_score_free(ucc_coll_score_t *score); +void ucc_coll_score_free(ucc_coll_score_t *score); /* Merges 2 scores score1 and score2 into the new score "rst" selecting larger score. Ie.: rst will contain a range from score1 if either @@ -87,9 +89,9 @@ void ucc_coll_score_free(ucc_coll_score_t *score); This fn is used by CL to merge scores from multiple TLs and produce a score map. As a result the produced score map will select TL with higher score.*/ -ucc_status_t ucc_coll_score_merge(ucc_coll_score_t * score1, - ucc_coll_score_t * score2, - ucc_coll_score_t **rst, int free_inputs); +ucc_status_t ucc_coll_score_merge(ucc_coll_score_t * score1, + ucc_coll_score_t * score2, + ucc_coll_score_t **rst, int free_inputs); /* Parses SCORE string (see ucc_base_iface.c for pattern description) @@ -147,7 +149,7 @@ ucc_status_t ucc_coll_score_build_default(ucc_base_team_t *team, ucc_status_t ucc_coll_score_build_map(ucc_coll_score_t *score, ucc_score_map_t **map); -void ucc_coll_score_free_map(ucc_score_map_t *map); +void ucc_coll_score_free_map(ucc_score_map_t *map); /* Initializes task based on args selection and score map. Checks fallbacks if necessary. */ diff --git a/src/coll_score/ucc_coll_score_map.c b/src/coll_score/ucc_coll_score_map.c index 5b67260bd8..9267a77478 100644 --- a/src/coll_score/ucc_coll_score_map.c +++ b/src/coll_score/ucc_coll_score_map.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -160,11 +160,12 @@ ucc_status_t ucc_coll_init(ucc_score_map_t *map, void ucc_coll_score_map_print_info(const ucc_score_map_t *map) { - size_t left; - ucc_msg_range_t *range; - int i, j, all_empty; - char range_str[128]; - char coll_str[1024]; + size_t left; + ucc_msg_range_t *range; + int i, j, all_empty; + char score_str[32]; + char range_str[128]; + char coll_str[1024]; for (i = 0; i < UCC_COLL_TYPE_NUM; i++) { all_empty = 1; @@ -191,10 +192,12 @@ void ucc_coll_score_map_print_info(const ucc_score_map_t *map) super.list_elem) { ucc_memunits_range_str(range->start, range->end, range_str, sizeof(range_str)); - STR_APPEND(coll_str, left, 256, "{%s}:%s:%u ", + ucc_score_to_str(range->super.score, score_str, + sizeof(score_str)); + STR_APPEND(coll_str, left, 256, "{%s}:%s:%s ", range_str, range->super.team->context->lib->log_component.name, - range->super.score); + score_str); } STR_APPEND(coll_str, left, 4, "\n"); } diff --git a/src/components/cl/hier/cl_hier.h b/src/components/cl/hier/cl_hier.h index 8f538c1d7b..c2fcf5e245 100644 --- a/src/components/cl/hier/cl_hier.h +++ b/src/components/cl/hier/cl_hier.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * Copyright (c) Meta Platforms, Inc. and affiliates. 2022. * * See file LICENSE for terms. @@ -109,8 +109,12 @@ typedef struct ucc_cl_hier_team { UCC_CLASS_DECLARE(ucc_cl_hier_team_t, ucc_base_context_t *, const ucc_base_team_params_t *); -#define UCC_CL_HIER_SUPPORTED_COLLS \ - (UCC_COLL_TYPE_ALLTOALL | UCC_COLL_TYPE_ALLTOALLV) +#define UCC_CL_HIER_SUPPORTED_COLLS \ + (UCC_COLL_TYPE_ALLTOALL | \ + UCC_COLL_TYPE_ALLTOALLV | \ + UCC_COLL_TYPE_ALLREDUCE | \ + UCC_COLL_TYPE_BARRIER | \ + UCC_COLL_TYPE_BCAST) ucc_status_t ucc_cl_hier_coll_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, diff --git a/src/components/cl/hier/cl_hier_team.c b/src/components/cl/hier/cl_hier_team.c index 8457e3db83..32ef7e2f93 100644 --- a/src/components/cl/hier/cl_hier_team.c +++ b/src/components/cl/hier/cl_hier_team.c @@ -363,7 +363,7 @@ ucc_status_t ucc_cl_hier_team_get_scores(ucc_base_team_t *cl_team, team_info.init = ucc_cl_hier_coll_init; team_info.num_mem_types = 0; team_info.supported_mem_types = NULL; /* all memory types supported*/ - team_info.supported_colls = UCC_COLL_TYPE_ALL; + team_info.supported_colls = UCC_CL_HIER_SUPPORTED_COLLS; team_info.size = UCC_CL_TEAM_SIZE(team); status = ucc_coll_score_alloc(&score); diff --git a/src/components/tl/mlx5/tl_mlx5_context.c b/src/components/tl/mlx5/tl_mlx5_context.c index 1bf21d50c0..8e6e764a22 100644 --- a/src/components/tl/mlx5/tl_mlx5_context.c +++ b/src/components/tl/mlx5/tl_mlx5_context.c @@ -74,7 +74,7 @@ UCC_CLASS_CLEANUP_FUNC(ucc_tl_mlx5_context_t) tl_debug(self->super.super.lib, "failed to free ib ctx and pd"); }; - if (!self->sock) { + if (self->sock) { close(self->sock); } diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index bf8e40aa6c..620155d017 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -14,6 +14,7 @@ allgather = \ allgather/allgather.c \ allgather/allgather_ring.c \ allgather/allgather_neighbor.c \ + allgather/allgather_bruck.c \ allgather/allgather_knomial.c allgatherv = \ diff --git a/src/components/tl/ucp/allgather/allgather.c b/src/components/tl/ucp/allgather/allgather.c index 926b732e55..6900944e66 100644 --- a/src/components/tl/ucp/allgather/allgather.c +++ b/src/components/tl/ucp/allgather/allgather.c @@ -23,6 +23,10 @@ ucc_base_coll_alg_info_t {.id = UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR, .name = "neighbor", .desc = "O(N) Neighbor Exchange N/2 steps"}, + [UCC_TL_UCP_ALLGATHER_ALG_BRUCK] = + {.id = UCC_TL_UCP_ALLGATHER_ALG_BRUCK, + .name = "bruck", + .desc = "O(log(N)) Variation of Bruck algorithm"}, [UCC_TL_UCP_ALLGATHER_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; diff --git a/src/components/tl/ucp/allgather/allgather.h b/src/components/tl/ucp/allgather/allgather.h index b68ab00e95..ac3592df86 100644 --- a/src/components/tl/ucp/allgather/allgather.h +++ b/src/components/tl/ucp/allgather/allgather.h @@ -12,6 +12,7 @@ enum { UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL, UCC_TL_UCP_ALLGATHER_ALG_RING, UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR, + UCC_TL_UCP_ALLGATHER_ALG_BRUCK, UCC_TL_UCP_ALLGATHER_ALG_LAST }; @@ -56,6 +57,17 @@ void ucc_tl_ucp_allgather_neighbor_progress(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *task); +/* Bruck */ +ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h); + +void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *task); + +ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *task); + +ucc_status_t ucc_tl_ucp_allgather_bruck_finalize(ucc_coll_task_t *coll_task); + /* Uses allgather_kn_radix from config */ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c new file mode 100644 index 0000000000..7b831eb2ad --- /dev/null +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -0,0 +1,258 @@ +/** + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ +#include "config.h" +#include "tl_ucp.h" +#include "allgather.h" +#include "core/ucc_progress_queue.h" +#include "tl_ucp_sendrecv.h" +#include "utils/ucc_math.h" +#include "utils/ucc_coll_utils.h" +#include "components/mc/ucc_mc.h" +#include + +ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h) +{ + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_task_t *task = ucc_tl_ucp_init_task(coll_args, team); + ucc_status_t status = UCC_OK; + ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(tl_team); + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + size_t count = TASK_ARGS(task).dst.info.count; + size_t data_size = (count / tsize) * ucc_dt_size(dt); + size_t scratch_size = (tsize - trank) * data_size; + + if (!ucc_coll_args_is_predefined_dt(&TASK_ARGS(task), UCC_RANK_INVALID)) { + tl_error(UCC_TASK_LIB(task), "user defined datatype is not supported"); + status = UCC_ERR_NOT_SUPPORTED; + goto out; + } + tl_trace(UCC_TASK_LIB(task), "ucc_tl_ucp_allgather_bruck_init"); + + task->super.post = ucc_tl_ucp_allgather_bruck_start; + task->super.progress = ucc_tl_ucp_allgather_bruck_progress; + task->super.finalize = ucc_tl_ucp_allgather_bruck_finalize; + + /* allocate scratch buffer only on non root rank */ + if (trank != 0) { + if (UCC_MEMORY_TYPE_HOST != rmem) { + scratch_size = tsize * data_size; + } + status = ucc_mc_alloc(&task->allgather_bruck.scratch_header, + scratch_size, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to allocate scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + goto out; + } + task->allgather_bruck.scratch_size = scratch_size; + } else { + task->allgather_bruck.scratch_header = NULL; + task->allgather_bruck.scratch_size = 0; + } +out: + if (status != UCC_OK) { + ucc_tl_ucp_put_task(task); + return status; + } + + *task_h = &task->super; + return status; +} + +ucc_status_t ucc_tl_ucp_allgather_bruck_finalize(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_status_t global_status = UCC_OK; + ucc_status_t status; + + tl_trace(UCC_TASK_LIB(task), "ucc_tl_ucp_allgather_bruck_finalize"); + + if (task->allgather_bruck.scratch_header != NULL) { + /* deallocate scratch buffer */ + global_status = ucc_mc_free(task->allgather_bruck.scratch_header); + if (ucc_unlikely(global_status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to free scratch buffer memory"); + } + task->allgather_bruck.scratch_size = 0; + } + + status = ucc_tl_ucp_coll_finalize(&task->super); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to finalize allgather bruck collective"); + global_status = status; + } + return global_status; +} + +/* Inspired by implementation: https://github.com/open-mpi/ompi/blob/main/ompi/mca/coll/base/coll_base_allgather.c */ +void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + void *rbuf = TASK_ARGS(task).dst.info.buffer; + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + size_t count = TASK_ARGS(task).dst.info.count; + ucc_mc_buffer_header_t *scratch_header = + task->allgather_bruck.scratch_header; + size_t scratch_size = task->allgather_bruck.scratch_size; + size_t data_size = (count / tsize) * ucc_dt_size(dt); + ucc_rank_t recvfrom, sendto; + ucc_status_t status; + size_t blockcount, distance; + void *tmprecv, *tmpsend; + + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + return; + } + + /* On each step doubles distance */ + distance = 1 << task->tagged.recv_posted; + tmpsend = rbuf; + while (distance < tsize) { + + recvfrom = (trank + distance) % tsize; + sendto = (trank + tsize - distance) % tsize; + + tmprecv = PTR_OFFSET(tmpsend, distance * data_size); + + if (distance <= tsize >> 1) { + blockcount = distance; + } else { + /* send-recv all reminder */ + blockcount = tsize - distance; + } + + /* Sendreceive */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(tmpsend, blockcount * data_size, rmem, + sendto, team, task), + task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(tmprecv, blockcount * data_size, rmem, + recvfrom, team, task), + task, out); + + if (UCC_INPROGRESS == ucc_tl_ucp_test_recv(task)) { + return; + } + + distance = 1 << task->tagged.recv_posted; + } + + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + return; + } + + /* post processing step */ + if (trank != 0) { + if (UCC_MEMORY_TYPE_HOST == rmem) { + // copy blocks [0 .. (size - rank - 1)] from rbuf to shift buffer + status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, + UCC_MEMORY_TYPE_HOST, rmem); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy data to scratch buffer"); + task->super.status = status; + return; + } + // move blocks [(size - rank) .. size] from rbuf to beginning of rbuf + // TODO: rewrite to cycle to get rid of overlap + memmove(rbuf, PTR_OFFSET(rbuf, scratch_size), trank * data_size); + // copy blocks from shift buffer starting at block [rank] in rbuf. + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), + scratch_header->addr, scratch_size, rmem, + UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy data from scratch to rbuff buffer"); + task->super.status = status; + return; + } + } else { + /* In case of non host memory we perform two copy to host buffer and then back to device, 3 memcopy in total */ + /* TODO: replace with generic kernel to do bruck post step in sinle launch on device */ + status = ucc_mc_memcpy( + PTR_OFFSET(scratch_header->addr, trank * data_size), rbuf, + (tsize - trank) * data_size, UCC_MEMORY_TYPE_HOST, rmem); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy first data part to scratch buffer"); + task->super.status = status; + return; + } + status = + ucc_mc_memcpy(scratch_header->addr, + PTR_OFFSET(rbuf, (tsize - trank) * data_size), + trank * data_size, UCC_MEMORY_TYPE_HOST, rmem); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy second data part to scratch buffer"); + task->super.status = status; + return; + } + status = + ucc_mc_memcpy(rbuf, scratch_header->addr, tsize * data_size, + rmem, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy from scratch buffer to dst"); + task->super.status = status; + return; + } + } + } + + ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task)); + + task->super.status = UCC_OK; + +out: + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_bruck_done", 0); +} + +ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + size_t count = TASK_ARGS(task).dst.info.count; + void *sbuf = TASK_ARGS(task).src.info.buffer; + void *rbuf = TASK_ARGS(task).dst.info.buffer; + ucc_memory_type_t smem = TASK_ARGS(task).src.info.mem_type; + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + size_t data_size = (count / tsize) * ucc_dt_size(dt); + ucc_status_t status; + + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_bruck_start", 0); + ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + + /* initial step: copy data on non root ranks to the beginning of buffer */ + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { + // not inplace: copy chunk from source buff to beginning of receive + status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else if (trank != 0) { + // inplace: copy chunk to the begin + status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank), + data_size, rmem, rmem); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } + + return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +} diff --git a/src/components/tl/ucp/bcast/bcast_dbt.c b/src/components/tl/ucp/bcast/bcast_dbt.c index 890e417dcd..9820f35cd2 100644 --- a/src/components/tl/ucp/bcast/bcast_dbt.c +++ b/src/components/tl/ucp/bcast/bcast_dbt.c @@ -112,6 +112,8 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) task->bcast_dbt.state = SEND_T1; SEND_T1: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); if ((coll_root == rank) || (task->bcast_dbt.t1.recv > 0)) { for (i = 0; i < 2; i++) { if ((t1.children[i] != UCC_RANK_INVALID) && @@ -130,6 +132,8 @@ void ucc_tl_ucp_bcast_dbt_progress(ucc_coll_task_t *coll_task) task->bcast_dbt.state = SEND_T2; SEND_T2: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); if ((coll_root == rank) || (task->bcast_dbt.t2.recv > 0)) { for (i = 0; i < 2; i++) { if ((t2.children[i] != UCC_RANK_INVALID) && @@ -250,6 +254,8 @@ ucc_status_t ucc_tl_ucp_bcast_dbt_init( task->super.finalize = ucc_tl_ucp_bcast_dbt_finalize; rank = task->subset.myrank; size = (ucc_rank_t)task->subset.map.ep_num; + task->n_polls = ucc_max(1, task->n_polls); + tl_team = TASK_TEAM(task); ucc_dbt_build_trees(rank, size, &task->bcast_dbt.t1, &task->bcast_dbt.t2); diff --git a/src/components/tl/ucp/reduce/reduce_dbt.c b/src/components/tl/ucp/reduce/reduce_dbt.c index 08e8774974..08b8a7aed5 100644 --- a/src/components/tl/ucp/reduce/reduce_dbt.c +++ b/src/components/tl/ucp/reduce/reduce_dbt.c @@ -155,6 +155,8 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task) task->reduce_dbt.state = REDUCE; REDUCE: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); for (i = 0; i < 2; i++) { if (trees[i].recv == trees[i].n_children && !task->reduce_dbt.reduction_comp[i]) { @@ -216,6 +218,8 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task) } TEST_ROOT: +/* test_recv is needed to progress ucp_worker */ + ucc_tl_ucp_test_recv(task); if (UCC_INPROGRESS == ucc_tl_ucp_test_send(task) || task->reduce_dbt.reduction_comp[0] != trees[0].recv || task->reduce_dbt.reduction_comp[1] != trees[1].recv) { diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 23c254b00e..3b4859b48f 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -262,6 +262,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR: *init = ucc_tl_ucp_allgather_neighbor_init; break; + case UCC_TL_UCP_ALLGATHER_ALG_BRUCK: + *init = ucc_tl_ucp_allgather_bruck_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index f2a6235aa0..96b635faa9 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -181,6 +181,10 @@ typedef struct ucc_tl_ucp_task { ucc_rank_t tsize, int step); } allgather_ring; + struct { + ucc_mc_buffer_header_t *scratch_header; + size_t scratch_size; + } allgather_bruck; struct { ucc_rank_t dist; uint32_t radix; diff --git a/src/core/ucc_context.c b/src/core/ucc_context.c index 7c5fd3c3ca..4d4f90f160 100644 --- a/src/core/ucc_context.c +++ b/src/core/ucc_context.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -45,6 +45,11 @@ static ucc_config_field_t ucc_context_config_table[] = { "is configured with OOB (global mode). 0 - disable, 1 - try, 2 - force.", ucc_offsetof(ucc_context_config_t, internal_oob), UCC_CONFIG_TYPE_UINT}, + {"THROTTLE_PROGRESS", "1000", + "Throttle UCC progress to every th invocation", + ucc_offsetof(ucc_context_config_t, throttle_progress), + UCC_CONFIG_TYPE_UINT}, + {NULL}}; UCC_CONFIG_REGISTER_TABLE(ucc_context_config_table, "UCC context", NULL, ucc_context_config_t, &ucc_config_global_list); @@ -614,9 +619,10 @@ ucc_status_t ucc_context_create_proc_info(ucc_lib_h lib, status = UCC_ERR_NO_MEMORY; goto error; } - ctx->rank = UCC_RANK_MAX; - ctx->lib = lib; - ctx->ids.pool_size = config->team_ids_pool_size; + ctx->throttle_progress = config->throttle_progress; + ctx->rank = UCC_RANK_MAX; + ctx->lib = lib; + ctx->ids.pool_size = config->team_ids_pool_size; ucc_list_head_init(&ctx->progress_list); ucc_copy_context_params(&ctx->params, params); ucc_copy_context_params(&b_params.params, params); @@ -957,12 +963,25 @@ ucc_status_t ucc_context_progress_deregister(ucc_context_t *ctx, ucc_status_t ucc_context_progress(ucc_context_h context) { + static int call_num = 0; ucc_status_t status; ucc_context_progress_entry_t *entry; - /* progress registered progress fns */ - ucc_list_for_each(entry, &context->progress_list, list_elem) { - entry->fn(entry->arg); + int is_empty; + + is_empty = ucc_progress_queue_is_empty(context->pq); + if (ucc_likely(is_empty)) { + call_num--; + if (ucc_likely(call_num >= 0)) { + return UCC_OK; + } + /* progress registered progress fns */ + ucc_list_for_each(entry, &context->progress_list, list_elem) { + entry->fn(entry->arg); + } + call_num = context->throttle_progress; + return UCC_OK; } + /* the fn below returns int - number of completed tasks. TODO : do we need to handle it ? Maybe return to user as int as well? */ diff --git a/src/core/ucc_context.h b/src/core/ucc_context.h index a95dd2b920..3944d5675b 100644 --- a/src/core/ucc_context.h +++ b/src/core/ucc_context.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -77,6 +77,7 @@ typedef struct ucc_context { ucc_context_topo_t *topo; uint64_t cl_flags; ucc_tl_team_t *service_team; + int32_t throttle_progress; } ucc_context_t; typedef struct ucc_context_config { @@ -90,6 +91,7 @@ typedef struct ucc_context_config { uint32_t estimated_num_ppn; uint32_t lock_free_progress_q; uint32_t internal_oob; + uint32_t throttle_progress; } ucc_context_config_t; /* Internal function for context creation that takes explicit diff --git a/src/core/ucc_progress_queue.h b/src/core/ucc_progress_queue.h index ba3d20b297..d4ede0c8c3 100644 --- a/src/core/ucc_progress_queue.h +++ b/src/core/ucc_progress_queue.h @@ -1,5 +1,6 @@ /** - * Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * * See file LICENSE for terms. */ @@ -14,6 +15,7 @@ struct ucc_progress_queue { void (*enqueue)(ucc_progress_queue_t *pq, ucc_coll_task_t *task); void (*dequeue)(ucc_progress_queue_t *pq, ucc_coll_task_t **task); int (*progress)(ucc_progress_queue_t *pq); + int (*is_empty)(ucc_progress_queue_t *pq); void (*finalize)(ucc_progress_queue_t *pq); }; @@ -46,6 +48,11 @@ static inline int ucc_progress_queue(ucc_progress_queue_t *pq) return pq->progress(pq); } +static inline int ucc_progress_queue_is_empty(ucc_progress_queue_t *pq) +{ + return pq->is_empty(pq); +} + void ucc_progress_queue_finalize(ucc_progress_queue_t *pq); #endif diff --git a/src/core/ucc_progress_queue_mt.c b/src/core/ucc_progress_queue_mt.c index 466628e27c..7da2171f03 100644 --- a/src/core/ucc_progress_queue_mt.c +++ b/src/core/ucc_progress_queue_mt.c @@ -1,5 +1,6 @@ /** - * Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * * See file LICENSE for terms. */ @@ -25,7 +26,7 @@ typedef struct ucc_pq_mt_locked { } ucc_pq_mt_locked_t; static void ucc_pq_locked_mt_enqueue(ucc_progress_queue_t *pq, - ucc_coll_task_t * task) + ucc_coll_task_t *task) { ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); @@ -42,7 +43,7 @@ static void ucc_pq_mt_enqueue(ucc_progress_queue_t *pq, ucc_coll_task_t *task) } static void ucc_pq_locked_mt_dequeue(ucc_progress_queue_t *pq, - ucc_coll_task_t ** popped_task) + ucc_coll_task_t **popped_task) { ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); *popped_task = NULL; @@ -56,7 +57,7 @@ static void ucc_pq_locked_mt_dequeue(ucc_progress_queue_t *pq, } static void ucc_pq_mt_dequeue(ucc_progress_queue_t *pq, - ucc_coll_task_t ** popped_task) + ucc_coll_task_t **popped_task) { ucc_pq_mt_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_t); ucc_lf_queue_elem_t *elem = ucc_lf_queue_dequeue(&pq_mt->lf_queue, 1); @@ -100,6 +101,20 @@ static int ucc_pq_mt_progress(ucc_progress_queue_t *pq) return n_progressed; } +static int ucc_pq_locked_mt_is_empty(ucc_progress_queue_t *pq) +{ + ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); + + /* this function should not be very accurate for the purpose of progress throttling */ + return ucc_list_is_empty(&pq_mt->queue); +} + +static int ucc_pq_mt_is_empty(ucc_progress_queue_t *pq) //NOLINT: pq is unused +{ + /* lock free progress queue never use throttling */ + return 0; +} + static void ucc_pq_locked_mt_finalize(ucc_progress_queue_t *pq) { ucc_pq_mt_locked_t *pq_mt = ucc_derived_of(pq, ucc_pq_mt_locked_t); @@ -128,6 +143,7 @@ ucc_status_t ucc_pq_mt_init(ucc_progress_queue_t **pq, pq_mt->super.dequeue = ucc_pq_mt_dequeue; pq_mt->super.progress = ucc_pq_mt_progress; pq_mt->super.finalize = ucc_pq_mt_finalize; + pq_mt->super.is_empty = ucc_pq_mt_is_empty; *pq = &pq_mt->super; } else { ucc_pq_mt_locked_t *pq_mt = ucc_malloc(sizeof(*pq_mt), "pq_mt"); @@ -141,6 +157,7 @@ ucc_status_t ucc_pq_mt_init(ucc_progress_queue_t **pq, pq_mt->super.dequeue = ucc_pq_locked_mt_dequeue; pq_mt->super.progress = ucc_pq_mt_progress; pq_mt->super.finalize = ucc_pq_locked_mt_finalize; + pq_mt->super.is_empty = ucc_pq_locked_mt_is_empty; *pq = &pq_mt->super; } return UCC_OK; diff --git a/src/core/ucc_progress_queue_st.c b/src/core/ucc_progress_queue_st.c index 048d7313dd..e9842a70d4 100644 --- a/src/core/ucc_progress_queue_st.c +++ b/src/core/ucc_progress_queue_st.c @@ -1,5 +1,6 @@ /** - * Copyright (c) 2020, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * * See file LICENSE for terms. */ @@ -67,6 +68,13 @@ static void ucc_pq_st_finalize(ucc_progress_queue_t *pq) ucc_free(pq_st); } +static int ucc_pq_st_is_empty(ucc_progress_queue_t *pq) +{ + ucc_pq_st_t *pq_st = ucc_derived_of(pq, ucc_pq_st_t); + + return ucc_list_is_empty(&pq_st->list); +} + ucc_status_t ucc_pq_st_init(ucc_progress_queue_t **pq) { ucc_pq_st_t *pq_st = ucc_malloc(sizeof(*pq_st), "pq_st"); @@ -79,6 +87,8 @@ ucc_status_t ucc_pq_st_init(ucc_progress_queue_t **pq) pq_st->super.dequeue = NULL; pq_st->super.progress = ucc_pq_st_progress; pq_st->super.finalize = ucc_pq_st_finalize; + pq_st->super.is_empty = ucc_pq_st_is_empty; + *pq = &pq_st->super; return UCC_OK; } diff --git a/test/gtest/coll/test_allgather.cc b/test/gtest/coll/test_allgather.cc index e1cacac5ac..2577bdf26c 100644 --- a/test/gtest/coll/test_allgather.cc +++ b/test/gtest/coll/test_allgather.cc @@ -296,7 +296,7 @@ INSTANTIATE_TEST_CASE_P( #endif ::testing::Values(1,3,8192), // count ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE), - ::testing::Values("knomial", "ring", "neighbor")), + ::testing::Values("knomial", "ring", "neighbor", "bruck")), [](const testing::TestParamInfo& info) { std::string name; name += ucc_datatype_str(std::get<0>(info.param));