From 99606173dfa1ee4854b641263cb10a362950b890 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Tue, 16 Jan 2024 11:38:56 +0100 Subject: [PATCH 01/14] TL/UCP: Bruck algorithm initial --- src/components/tl/ucp/Makefile.am | 1 + src/components/tl/ucp/allgather/allgather.c | 8 +- src/components/tl/ucp/allgather/allgather.h | 10 + .../tl/ucp/allgather/allgather_bruck.c | 197 ++++++++++++++++++ src/components/tl/ucp/tl_ucp_coll.c | 3 + test/gtest/coll/test_allgather.cc | 2 +- 6 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 src/components/tl/ucp/allgather/allgather_bruck.c diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index bf8e40aa6c..620155d017 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -14,6 +14,7 @@ allgather = \ allgather/allgather.c \ allgather/allgather_ring.c \ allgather/allgather_neighbor.c \ + allgather/allgather_bruck.c \ allgather/allgather_knomial.c allgatherv = \ diff --git a/src/components/tl/ucp/allgather/allgather.c b/src/components/tl/ucp/allgather/allgather.c index 926b732e55..96309daa86 100644 --- a/src/components/tl/ucp/allgather/allgather.c +++ b/src/components/tl/ucp/allgather/allgather.c @@ -23,11 +23,16 @@ ucc_base_coll_alg_info_t {.id = UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR, .name = "neighbor", .desc = "O(N) Neighbor Exchange N/2 steps"}, + [UCC_TL_UCP_ALLGATHER_ALG_BRUCK] = + {.id = UCC_TL_UCP_ALLGATHER_ALG_BRUCK, + .name = "bruck", + .desc = "O(log(N)) Variation of Bruck algorithm"}, [UCC_TL_UCP_ALLGATHER_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; ucc_status_t ucc_tl_ucp_allgather_init(ucc_tl_ucp_task_t *task) { + printf ("HELLO\n"); return ucc_tl_ucp_allgather_ring_init_common(task); } @@ -36,7 +41,7 @@ char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team) int max_size = ALLGATHER_MAX_PATTERN_SIZE; int algo_num = UCC_TL_TEAM_SIZE(team) % 2 ? UCC_TL_UCP_ALLGATHER_ALG_RING - : UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR; + : UCC_TL_UCP_ALLGATHER_ALG_BRUCK; char *str = ucc_malloc(max_size * sizeof(char)); ucc_sbgp_t *sbgp; @@ -46,6 +51,7 @@ char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team) algo_num = UCC_TL_UCP_ALLGATHER_ALG_RING; } } + fprintf(stderr, "Algo num: %d\n", algo_num); ucc_snprintf_safe(str, max_size, UCC_TL_UCP_ALLGATHER_DEFAULT_ALG_SELECT_STR, algo_num); return str; diff --git a/src/components/tl/ucp/allgather/allgather.h b/src/components/tl/ucp/allgather/allgather.h index b68ab00e95..0e816e8838 100644 --- a/src/components/tl/ucp/allgather/allgather.h +++ b/src/components/tl/ucp/allgather/allgather.h @@ -12,6 +12,7 @@ enum { UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL, UCC_TL_UCP_ALLGATHER_ALG_RING, UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR, + UCC_TL_UCP_ALLGATHER_ALG_BRUCK, UCC_TL_UCP_ALLGATHER_ALG_LAST }; @@ -56,6 +57,15 @@ void ucc_tl_ucp_allgather_neighbor_progress(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_allgather_neighbor_start(ucc_coll_task_t *task); +/* Bruck */ +ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h); + +void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *task); + +ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *task); + /* Uses allgather_kn_radix from config */ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c new file mode 100644 index 0000000000..052f12802c --- /dev/null +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -0,0 +1,197 @@ +/** + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ +#include "config.h" +#include "tl_ucp.h" +#include "allgather.h" +#include "core/ucc_progress_queue.h" +#include "tl_ucp_sendrecv.h" +#include "utils/ucc_math.h" +#include "utils/ucc_coll_utils.h" +#include "components/mc/ucc_mc.h" +#include + +ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h) +{ + ucc_status_t status = UCC_OK; + ucc_tl_ucp_task_t *task; + ucc_tl_ucp_team_t *ucp_team; + + task = ucc_tl_ucp_init_task(coll_args, team); + ucp_team = TASK_TEAM(task); + + if (!ucc_coll_args_is_predefined_dt(&TASK_ARGS(task), UCC_RANK_INVALID)) { + tl_error(UCC_TASK_LIB(task), "user defined datatype is not supported"); + status = UCC_ERR_NOT_SUPPORTED; + goto out; + } + printf("ucc_tl_ucp_allgather_bruck_init\n"); + if (UCC_TL_TEAM_SIZE(ucp_team) % 2) { + tl_debug(UCC_TASK_LIB(task), + "odd team size is not supported, switching to ring"); + status = ucc_tl_ucp_allgather_ring_init_common(task); + } else { + task->super.post = ucc_tl_ucp_allgather_bruck_start; + task->super.progress = ucc_tl_ucp_allgather_bruck_progress; + } + +out: + if (status != UCC_OK) { + ucc_tl_ucp_put_task(task); + return status; + } + + *task_h = &task->super; + return status; +} + +/* Inspired by implementation: https://github.com/open-mpi/ompi/blob/main/ompi/mca/coll/base/coll_base_allgather.c */ +void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + void *rbuf = TASK_ARGS(task).dst.info.buffer; + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + size_t count = TASK_ARGS(task).dst.info.count; + size_t data_size = (count / tsize) * ucc_dt_size(dt); + ucc_rank_t recvfrom, sendto; + ucc_status_t status; + size_t blockcount, distance; + void *tmprecv, *tmpsend; + + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + return; + } + + /* On each step doubles distance */ + distance = 1 << task->tagged.send_posted; + printf("bruck\n"); + tmpsend = rbuf; + while (distance < (tsize)) { + + recvfrom = (trank + distance) % tsize; + sendto = (trank + tsize - distance) % tsize; + + tmprecv = PTR_OFFSET(tmpsend, distance * data_size); + + if (distance <= tsize >> 1) { + blockcount = distance; + } else { + /* send-recv all reminder*/ + blockcount = tsize - distance; + } + + /* Sendreceive */ + UCPCHECK_GOTO(ucc_tl_ucp_send_nb(tmpsend, blockcount * data_size, rmem, + sendto, team, task), + task, out); + UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(tmprecv, blockcount * data_size, rmem, + recvfrom, team, task), + task, out); + + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + return; + } + } + + /* post processing step */ + if (trank != 0) { + ucc_mc_buffer_header_t *scratch_header; + size_t scratch_size = (tsize - trank) * data_size; + /* allocate scratch buffer */ + status = + ucc_mc_alloc(&scratch_header, scratch_size, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to allocate scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + + status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, + UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy data to scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + + status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, scratch_size), + trank * data_size, UCC_MEMORY_TYPE_HOST, + UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to move data inside rbuff buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), + scratch_header->addr, scratch_size, + UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy data from scratch to rbuff buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + + /* deallocate scratch buffer */ + status = ucc_mc_free(scratch_header); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to free scratch buffer memory"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + } + + ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task)); + task->super.status = UCC_OK; + +out: + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_bruck_done", 0); +} + +ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + size_t count = TASK_ARGS(task).dst.info.count; + void *sbuf = TASK_ARGS(task).src.info.buffer; + void *rbuf = TASK_ARGS(task).dst.info.buffer; + ucc_memory_type_t smem = TASK_ARGS(task).src.info.mem_type; + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + size_t data_size = (count / tsize) * ucc_dt_size(dt); + ucc_status_t status; + + UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_bruck_start", 0); + ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); + + /* initial step: copy data on non root ranks to the beginning of buffer */ + if (!UCC_IS_INPLACE(TASK_ARGS(task))) { + status = ucc_mc_memcpy(rbuf, PTR_OFFSET(sbuf, data_size * trank), + data_size, rmem, smem); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } else if (trank != 0) { + status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank), + data_size, rmem, rmem); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + } + + return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super); +} diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 23c254b00e..3b4859b48f 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -262,6 +262,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR: *init = ucc_tl_ucp_allgather_neighbor_init; break; + case UCC_TL_UCP_ALLGATHER_ALG_BRUCK: + *init = ucc_tl_ucp_allgather_bruck_init; + break; default: status = UCC_ERR_INVALID_PARAM; break; diff --git a/test/gtest/coll/test_allgather.cc b/test/gtest/coll/test_allgather.cc index e1cacac5ac..2577bdf26c 100644 --- a/test/gtest/coll/test_allgather.cc +++ b/test/gtest/coll/test_allgather.cc @@ -296,7 +296,7 @@ INSTANTIATE_TEST_CASE_P( #endif ::testing::Values(1,3,8192), // count ::testing::Values(TEST_INPLACE, TEST_NO_INPLACE), - ::testing::Values("knomial", "ring", "neighbor")), + ::testing::Values("knomial", "ring", "neighbor", "bruck")), [](const testing::TestParamInfo& info) { std::string name; name += ucc_datatype_str(std::get<0>(info.param)); From 589258afca9dff6214fc6e3569d1441fcdffb2de Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 2 Feb 2024 15:22:21 +0100 Subject: [PATCH 02/14] TL/UCP: fixed memory copy and alloc --- src/components/tl/ucp/allgather/allgather.h | 2 + .../tl/ucp/allgather/allgather_bruck.c | 122 ++++++++++-------- src/components/tl/ucp/tl_ucp_coll.h | 4 + 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather.h b/src/components/tl/ucp/allgather/allgather.h index 0e816e8838..ac3592df86 100644 --- a/src/components/tl/ucp/allgather/allgather.h +++ b/src/components/tl/ucp/allgather/allgather.h @@ -66,6 +66,8 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *task); +ucc_status_t ucc_tl_ucp_allgather_bruck_finalize(ucc_coll_task_t *coll_task); + /* Uses allgather_kn_radix from config */ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 052f12802c..0e1d17f353 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -17,27 +17,36 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, ucc_coll_task_t **task_h) { - ucc_status_t status = UCC_OK; - ucc_tl_ucp_task_t *task; - ucc_tl_ucp_team_t *ucp_team; - - task = ucc_tl_ucp_init_task(coll_args, team); - ucp_team = TASK_TEAM(task); + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_tl_ucp_task_t *task = ucc_tl_ucp_init_task(coll_args, team); + ucc_status_t status = UCC_OK; + ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(tl_team); + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + size_t count = TASK_ARGS(task).dst.info.count; + size_t data_size = (count / tsize) * ucc_dt_size(dt); + size_t scratch_size = (tsize - trank) * data_size; if (!ucc_coll_args_is_predefined_dt(&TASK_ARGS(task), UCC_RANK_INVALID)) { tl_error(UCC_TASK_LIB(task), "user defined datatype is not supported"); status = UCC_ERR_NOT_SUPPORTED; goto out; } - printf("ucc_tl_ucp_allgather_bruck_init\n"); - if (UCC_TL_TEAM_SIZE(ucp_team) % 2) { - tl_debug(UCC_TASK_LIB(task), - "odd team size is not supported, switching to ring"); - status = ucc_tl_ucp_allgather_ring_init_common(task); - } else { - task->super.post = ucc_tl_ucp_allgather_bruck_start; - task->super.progress = ucc_tl_ucp_allgather_bruck_progress; + tl_trace(UCC_TASK_LIB(task), "ucc_tl_ucp_allgather_bruck_init"); + + task->super.post = ucc_tl_ucp_allgather_bruck_start; + task->super.progress = ucc_tl_ucp_allgather_bruck_progress; + task->super.finalize = ucc_tl_ucp_allgather_bruck_finalize; + + /* allocate scratch buffer */ + status = ucc_mc_alloc(&task->allgather_bruck.scratch_header, scratch_size, + UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to allocate scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + goto out; } + task->allgather_bruck.scratch_size = scratch_size; out: if (status != UCC_OK) { @@ -49,22 +58,48 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, return status; } +ucc_status_t ucc_tl_ucp_allgather_bruck_finalize(ucc_coll_task_t *coll_task) +{ + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_status_t status, global_status; + + tl_trace(UCC_TASK_LIB(task), "ucc_tl_ucp_allgather_bruck_finalize"); + + /* deallocate scratch buffer */ + global_status = ucc_mc_free(task->allgather_bruck.scratch_header); + if (ucc_unlikely(global_status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to free scratch buffer memory"); + } + task->allgather_bruck.scratch_size = 0; + + status = ucc_tl_ucp_coll_finalize(&task->super); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to finalize allgather bruck collective"); + global_status = status; + } + return global_status; +} + /* Inspired by implementation: https://github.com/open-mpi/ompi/blob/main/ompi/mca/coll/base/coll_base_allgather.c */ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) { - ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t trank = UCC_TL_TEAM_RANK(team); - ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); - void *rbuf = TASK_ARGS(task).dst.info.buffer; - ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; - ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; - size_t count = TASK_ARGS(task).dst.info.count; - size_t data_size = (count / tsize) * ucc_dt_size(dt); - ucc_rank_t recvfrom, sendto; - ucc_status_t status; - size_t blockcount, distance; - void *tmprecv, *tmpsend; + ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); + ucc_tl_ucp_team_t *team = TASK_TEAM(task); + ucc_rank_t trank = UCC_TL_TEAM_RANK(team); + ucc_rank_t tsize = UCC_TL_TEAM_SIZE(team); + void *rbuf = TASK_ARGS(task).dst.info.buffer; + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; + ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; + size_t count = TASK_ARGS(task).dst.info.count; + ucc_mc_buffer_header_t *scratch_header = + task->allgather_bruck.scratch_header; + size_t scratch_size = task->allgather_bruck.scratch_size; + size_t data_size = (count / tsize) * ucc_dt_size(dt); + ucc_rank_t recvfrom, sendto; + ucc_status_t status; + size_t blockcount, distance; + void *tmprecv, *tmpsend; if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { return; @@ -72,9 +107,8 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) /* On each step doubles distance */ distance = 1 << task->tagged.send_posted; - printf("bruck\n"); - tmpsend = rbuf; - while (distance < (tsize)) { + tmpsend = rbuf; + while (distance < tsize) { recvfrom = (trank + distance) % tsize; sendto = (trank + tsize - distance) % tsize; @@ -99,21 +133,12 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { return; } + + distance = 1 << task->tagged.send_posted; } /* post processing step */ if (trank != 0) { - ucc_mc_buffer_header_t *scratch_header; - size_t scratch_size = (tsize - trank) * data_size; - /* allocate scratch buffer */ - status = - ucc_mc_alloc(&scratch_header, scratch_size, UCC_MEMORY_TYPE_HOST); - if (ucc_unlikely(status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), "failed to allocate scratch buffer"); - ucc_tl_ucp_coll_finalize(&task->super); - return; - } - status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); if (ucc_unlikely(status != UCC_OK)) { @@ -142,18 +167,10 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) ucc_tl_ucp_coll_finalize(&task->super); return; } - - /* deallocate scratch buffer */ - status = ucc_mc_free(scratch_header); - if (ucc_unlikely(status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), - "failed to free scratch buffer memory"); - ucc_tl_ucp_coll_finalize(&task->super); - return; - } } ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task)); + task->super.status = UCC_OK; out: @@ -180,12 +197,13 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task) /* initial step: copy data on non root ranks to the beginning of buffer */ if (!UCC_IS_INPLACE(TASK_ARGS(task))) { - status = ucc_mc_memcpy(rbuf, PTR_OFFSET(sbuf, data_size * trank), - data_size, rmem, smem); + // not inplace: copy chunk from source buff to beginning of receive + status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem); if (ucc_unlikely(UCC_OK != status)) { return status; } } else if (trank != 0) { + // inplace: copy chunk to the begin status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, rmem); if (ucc_unlikely(UCC_OK != status)) { diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index 6ab2c661dd..fc64ed2d0e 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -181,6 +181,10 @@ typedef struct ucc_tl_ucp_task { ucc_rank_t tsize, int step); } allgather_ring; + struct { + ucc_mc_buffer_header_t *scratch_header; + size_t scratch_size; + } allgather_bruck; struct { ucc_rank_t dist; uint32_t radix; From 815fd748620319a14f39bb6158f53dfadc8de45b Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 2 Feb 2024 18:46:15 +0100 Subject: [PATCH 03/14] TL/UCP: fixed memory copy in post stage --- src/components/tl/ucp/allgather/allgather_bruck.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 0e1d17f353..d1f6b82ddb 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -71,7 +71,7 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_finalize(ucc_coll_task_t *coll_task) tl_error(UCC_TASK_LIB(task), "failed to free scratch buffer memory"); } task->allgather_bruck.scratch_size = 0; - + status = ucc_tl_ucp_coll_finalize(&task->super); if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), @@ -149,8 +149,8 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) } status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, scratch_size), - trank * data_size, UCC_MEMORY_TYPE_HOST, - UCC_MEMORY_TYPE_HOST); + count * ucc_dt_size(dt) - scratch_size, + UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to move data inside rbuff buffer"); From c902985335b1c4b360c859da60a84652a5662219 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 2 Feb 2024 18:53:51 +0100 Subject: [PATCH 04/14] TL/UCP: removed debug printfs --- src/components/tl/ucp/allgather/allgather.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather.c b/src/components/tl/ucp/allgather/allgather.c index 96309daa86..9de47f6a41 100644 --- a/src/components/tl/ucp/allgather/allgather.c +++ b/src/components/tl/ucp/allgather/allgather.c @@ -32,7 +32,6 @@ ucc_base_coll_alg_info_t ucc_status_t ucc_tl_ucp_allgather_init(ucc_tl_ucp_task_t *task) { - printf ("HELLO\n"); return ucc_tl_ucp_allgather_ring_init_common(task); } @@ -51,7 +50,6 @@ char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team) algo_num = UCC_TL_UCP_ALLGATHER_ALG_RING; } } - fprintf(stderr, "Algo num: %d\n", algo_num); ucc_snprintf_safe(str, max_size, UCC_TL_UCP_ALLGATHER_DEFAULT_ALG_SELECT_STR, algo_num); return str; From 7689bc795dff5889578f3a8c6a76b484696e6d84 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 2 Feb 2024 21:42:06 +0100 Subject: [PATCH 05/14] TL/UCP: fixed post copy --- src/components/tl/ucp/allgather/allgather_bruck.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index d1f6b82ddb..2c0d214774 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -139,6 +139,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) /* post processing step */ if (trank != 0) { + // copy blocks [0 .. (size - rank - 1)] from rbuf to shift buffer status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); if (ucc_unlikely(status != UCC_OK)) { @@ -147,17 +148,17 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) ucc_tl_ucp_coll_finalize(&task->super); return; } - + // move blocks [(size - rank) .. size] from rbuf to beginning of rbuf status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, scratch_size), - count * ucc_dt_size(dt) - scratch_size, - UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); + trank * data_size, UCC_MEMORY_TYPE_HOST, + UCC_MEMORY_TYPE_HOST); if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to move data inside rbuff buffer"); ucc_tl_ucp_coll_finalize(&task->super); return; } - + // copy blocks from shift buffer starting at block [rank] in rbuf. status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), scratch_header->addr, scratch_size, UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); From 4bf1b98027127304bb080cc4adc201311e3d5b89 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Sat, 3 Feb 2024 15:48:51 +0100 Subject: [PATCH 06/14] TL/UCP: changed to memmove --- src/components/tl/ucp/allgather/allgather_bruck.c | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 2c0d214774..3d7dc14f52 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -149,15 +149,8 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) return; } // move blocks [(size - rank) .. size] from rbuf to beginning of rbuf - status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, scratch_size), - trank * data_size, UCC_MEMORY_TYPE_HOST, - UCC_MEMORY_TYPE_HOST); - if (ucc_unlikely(status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), - "failed to move data inside rbuff buffer"); - ucc_tl_ucp_coll_finalize(&task->super); - return; - } + // TODO: rewrite to cycle to get rid of overlap + memmove(rbuf, PTR_OFFSET(rbuf, scratch_size), trank * data_size); // copy blocks from shift buffer starting at block [rank] in rbuf. status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), scratch_header->addr, scratch_size, From dec1b7d2634590331c6011683000bd4fe2ab7460 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Sat, 3 Feb 2024 15:55:29 +0100 Subject: [PATCH 07/14] TL/UCP: allocate only for non root rank --- .../tl/ucp/allgather/allgather_bruck.c | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 3d7dc14f52..2f192ed222 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -38,16 +38,20 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, task->super.progress = ucc_tl_ucp_allgather_bruck_progress; task->super.finalize = ucc_tl_ucp_allgather_bruck_finalize; - /* allocate scratch buffer */ - status = ucc_mc_alloc(&task->allgather_bruck.scratch_header, scratch_size, - UCC_MEMORY_TYPE_HOST); - if (ucc_unlikely(status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), "failed to allocate scratch buffer"); - ucc_tl_ucp_coll_finalize(&task->super); - goto out; + if (trank != 0) { + /* allocate scratch buffer only on non root rank */ + status = ucc_mc_alloc(&task->allgather_bruck.scratch_header, + scratch_size, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), "failed to allocate scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + goto out; + } + task->allgather_bruck.scratch_size = scratch_size; + } else { + task->allgather_bruck.scratch_header = NULL; + task->allgather_bruck.scratch_size = 0; } - task->allgather_bruck.scratch_size = scratch_size; - out: if (status != UCC_OK) { ucc_tl_ucp_put_task(task); @@ -61,16 +65,20 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, ucc_status_t ucc_tl_ucp_allgather_bruck_finalize(ucc_coll_task_t *coll_task) { ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); - ucc_status_t status, global_status; + ucc_status_t global_status = UCC_OK; + ucc_status_t status; tl_trace(UCC_TASK_LIB(task), "ucc_tl_ucp_allgather_bruck_finalize"); - /* deallocate scratch buffer */ - global_status = ucc_mc_free(task->allgather_bruck.scratch_header); - if (ucc_unlikely(global_status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), "failed to free scratch buffer memory"); + if (task->allgather_bruck.scratch_header != NULL) { + /* deallocate scratch buffer */ + global_status = ucc_mc_free(task->allgather_bruck.scratch_header); + if (ucc_unlikely(global_status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to free scratch buffer memory"); + } + task->allgather_bruck.scratch_size = 0; } - task->allgather_bruck.scratch_size = 0; status = ucc_tl_ucp_coll_finalize(&task->super); if (ucc_unlikely(status != UCC_OK)) { From c1a9021af37bf2fe131055d30bc0cbb6bc01bba6 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Sat, 3 Feb 2024 16:18:23 +0100 Subject: [PATCH 08/14] TL/UCP: back to neighbor exchange --- src/components/tl/ucp/allgather/allgather.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/components/tl/ucp/allgather/allgather.c b/src/components/tl/ucp/allgather/allgather.c index 9de47f6a41..6900944e66 100644 --- a/src/components/tl/ucp/allgather/allgather.c +++ b/src/components/tl/ucp/allgather/allgather.c @@ -40,7 +40,7 @@ char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team) int max_size = ALLGATHER_MAX_PATTERN_SIZE; int algo_num = UCC_TL_TEAM_SIZE(team) % 2 ? UCC_TL_UCP_ALLGATHER_ALG_RING - : UCC_TL_UCP_ALLGATHER_ALG_BRUCK; + : UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR; char *str = ucc_malloc(max_size * sizeof(char)); ucc_sbgp_t *sbgp; From b1fa35f62733b595defc59ac5db98dc89c99ae4b Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Tue, 6 Feb 2024 15:24:59 +0100 Subject: [PATCH 09/14] TL/UCP: fixed memory type --- src/components/tl/ucp/allgather/allgather_bruck.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 2f192ed222..053e3f06f0 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -149,7 +149,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (trank != 0) { // copy blocks [0 .. (size - rank - 1)] from rbuf to shift buffer status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, - UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); + UCC_MEMORY_TYPE_HOST, rmem); if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy data to scratch buffer"); @@ -161,8 +161,8 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) memmove(rbuf, PTR_OFFSET(rbuf, scratch_size), trank * data_size); // copy blocks from shift buffer starting at block [rank] in rbuf. status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), - scratch_header->addr, scratch_size, - UCC_MEMORY_TYPE_HOST, UCC_MEMORY_TYPE_HOST); + scratch_header->addr, scratch_size, rmem, + UCC_MEMORY_TYPE_HOST); if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy data from scratch to rbuff buffer"); From 919d6fe7c88130b43db7e121c10330743d209fef Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Sun, 11 Feb 2024 17:41:04 +0100 Subject: [PATCH 10/14] TL/UCP: fixed bruck post step for device buffer --- .../tl/ucp/allgather/allgather_bruck.c | 81 ++++++++++++++----- 1 file changed, 59 insertions(+), 22 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 053e3f06f0..a8ef9017d7 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -22,6 +22,7 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, ucc_status_t status = UCC_OK; ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); ucc_rank_t tsize = UCC_TL_TEAM_SIZE(tl_team); + ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type; ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype; size_t count = TASK_ARGS(task).dst.info.count; size_t data_size = (count / tsize) * ucc_dt_size(dt); @@ -38,8 +39,11 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_init(ucc_base_coll_args_t *coll_args, task->super.progress = ucc_tl_ucp_allgather_bruck_progress; task->super.finalize = ucc_tl_ucp_allgather_bruck_finalize; + /* allocate scratch buffer only on non root rank */ if (trank != 0) { - /* allocate scratch buffer only on non root rank */ + if (UCC_MEMORY_TYPE_HOST != rmem) { + scratch_size = tsize * data_size; + } status = ucc_mc_alloc(&task->allgather_bruck.scratch_header, scratch_size, UCC_MEMORY_TYPE_HOST); if (ucc_unlikely(status != UCC_OK)) { @@ -147,27 +151,60 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) /* post processing step */ if (trank != 0) { - // copy blocks [0 .. (size - rank - 1)] from rbuf to shift buffer - status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, - UCC_MEMORY_TYPE_HOST, rmem); - if (ucc_unlikely(status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), - "failed to copy data to scratch buffer"); - ucc_tl_ucp_coll_finalize(&task->super); - return; - } - // move blocks [(size - rank) .. size] from rbuf to beginning of rbuf - // TODO: rewrite to cycle to get rid of overlap - memmove(rbuf, PTR_OFFSET(rbuf, scratch_size), trank * data_size); - // copy blocks from shift buffer starting at block [rank] in rbuf. - status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), - scratch_header->addr, scratch_size, rmem, - UCC_MEMORY_TYPE_HOST); - if (ucc_unlikely(status != UCC_OK)) { - tl_error(UCC_TASK_LIB(task), - "failed to copy data from scratch to rbuff buffer"); - ucc_tl_ucp_coll_finalize(&task->super); - return; + if (UCC_MEMORY_TYPE_HOST == rmem) { + // copy blocks [0 .. (size - rank - 1)] from rbuf to shift buffer + status = ucc_mc_memcpy(scratch_header->addr, rbuf, scratch_size, + UCC_MEMORY_TYPE_HOST, rmem); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy data to scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + // move blocks [(size - rank) .. size] from rbuf to beginning of rbuf + // TODO: rewrite to cycle to get rid of overlap + memmove(rbuf, PTR_OFFSET(rbuf, scratch_size), trank * data_size); + // copy blocks from shift buffer starting at block [rank] in rbuf. + status = ucc_mc_memcpy(PTR_OFFSET(rbuf, trank * data_size), + scratch_header->addr, scratch_size, rmem, + UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy data from scratch to rbuff buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + } else { + /* In case of non host memory we perform two copy to host buffer and then back to device, 3 memcopy in total */ + /* TODO: replace with generic kernel to do bruck post step in sinle launch on device */ + status = ucc_mc_memcpy( + PTR_OFFSET(scratch_header->addr, trank * data_size), rbuf, + (tsize - trank) * data_size, UCC_MEMORY_TYPE_HOST, rmem); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy first data part to scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + status = + ucc_mc_memcpy(scratch_header->addr, + PTR_OFFSET(rbuf, (tsize - trank) * data_size), + trank * data_size, UCC_MEMORY_TYPE_HOST, rmem); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy second data part to scratch buffer"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } + status = + ucc_mc_memcpy(rbuf, scratch_header->addr, tsize * data_size, + rmem, UCC_MEMORY_TYPE_HOST); + if (ucc_unlikely(status != UCC_OK)) { + tl_error(UCC_TASK_LIB(task), + "failed to copy from scratch buffer to dst"); + ucc_tl_ucp_coll_finalize(&task->super); + return; + } } } From 02d96cf47010f1761a2b391991bd5e6cfc502132 Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Thu, 22 Feb 2024 14:26:53 +0100 Subject: [PATCH 11/14] TL/UCP: fixed alignment --- src/components/tl/ucp/tl_ucp_coll.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h index fc64ed2d0e..0a8a340955 100644 --- a/src/components/tl/ucp/tl_ucp_coll.h +++ b/src/components/tl/ucp/tl_ucp_coll.h @@ -183,7 +183,7 @@ typedef struct ucc_tl_ucp_task { } allgather_ring; struct { ucc_mc_buffer_header_t *scratch_header; - size_t scratch_size; + size_t scratch_size; } allgather_bruck; struct { ucc_rank_t dist; From cf067d8854831f80630f33d374eacb7f65cad42f Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 23 Feb 2024 15:30:10 +0100 Subject: [PATCH 12/14] TL/UCP: set error status for task --- src/components/tl/ucp/allgather/allgather_bruck.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index a8ef9017d7..74fd7239d6 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -159,6 +159,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) tl_error(UCC_TASK_LIB(task), "failed to copy data to scratch buffer"); ucc_tl_ucp_coll_finalize(&task->super); + task->super.status = status; return; } // move blocks [(size - rank) .. size] from rbuf to beginning of rbuf @@ -172,6 +173,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) tl_error(UCC_TASK_LIB(task), "failed to copy data from scratch to rbuff buffer"); ucc_tl_ucp_coll_finalize(&task->super); + task->super.status = status; return; } } else { @@ -184,6 +186,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) tl_error(UCC_TASK_LIB(task), "failed to copy first data part to scratch buffer"); ucc_tl_ucp_coll_finalize(&task->super); + task->super.status = status; return; } status = @@ -194,6 +197,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) tl_error(UCC_TASK_LIB(task), "failed to copy second data part to scratch buffer"); ucc_tl_ucp_coll_finalize(&task->super); + task->super.status = status; return; } status = @@ -203,6 +207,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) tl_error(UCC_TASK_LIB(task), "failed to copy from scratch buffer to dst"); ucc_tl_ucp_coll_finalize(&task->super); + task->super.status = status; return; } } From baedc0094a8e7a71af96e04a1b4bb6f49431ae7e Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 23 Feb 2024 16:23:51 +0100 Subject: [PATCH 13/14] TL/UCP: removed coll_finalize from progress --- src/components/tl/ucp/allgather/allgather_bruck.c | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index 74fd7239d6..b24c217efc 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -130,7 +130,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (distance <= tsize >> 1) { blockcount = distance; } else { - /* send-recv all reminder*/ + /* send-recv all reminder */ blockcount = tsize - distance; } @@ -158,7 +158,6 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy data to scratch buffer"); - ucc_tl_ucp_coll_finalize(&task->super); task->super.status = status; return; } @@ -172,7 +171,6 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy data from scratch to rbuff buffer"); - ucc_tl_ucp_coll_finalize(&task->super); task->super.status = status; return; } @@ -185,7 +183,6 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy first data part to scratch buffer"); - ucc_tl_ucp_coll_finalize(&task->super); task->super.status = status; return; } @@ -196,7 +193,6 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy second data part to scratch buffer"); - ucc_tl_ucp_coll_finalize(&task->super); task->super.status = status; return; } @@ -206,7 +202,6 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) if (ucc_unlikely(status != UCC_OK)) { tl_error(UCC_TASK_LIB(task), "failed to copy from scratch buffer to dst"); - ucc_tl_ucp_coll_finalize(&task->super); task->super.status = status; return; } From 49717f189786b430e3faebc65f1ce2e6b81f848f Mon Sep 17 00:00:00 2001 From: Ilya Kryukov Date: Fri, 23 Feb 2024 16:45:35 +0100 Subject: [PATCH 14/14] TL/UCP: wait only rec not both (send + recv) --- src/components/tl/ucp/allgather/allgather_bruck.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/components/tl/ucp/allgather/allgather_bruck.c b/src/components/tl/ucp/allgather/allgather_bruck.c index b24c217efc..7b831eb2ad 100644 --- a/src/components/tl/ucp/allgather/allgather_bruck.c +++ b/src/components/tl/ucp/allgather/allgather_bruck.c @@ -118,7 +118,7 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) } /* On each step doubles distance */ - distance = 1 << task->tagged.send_posted; + distance = 1 << task->tagged.recv_posted; tmpsend = rbuf; while (distance < tsize) { @@ -142,11 +142,15 @@ void ucc_tl_ucp_allgather_bruck_progress(ucc_coll_task_t *coll_task) recvfrom, team, task), task, out); - if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + if (UCC_INPROGRESS == ucc_tl_ucp_test_recv(task)) { return; } - distance = 1 << task->tagged.send_posted; + distance = 1 << task->tagged.recv_posted; + } + + if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) { + return; } /* post processing step */