From 3af837cfd1760a6da1564636979a570b51137792 Mon Sep 17 00:00:00 2001 From: Sergey Lebedev Date: Tue, 10 Oct 2023 13:13:39 +0000 Subject: [PATCH 1/2] CL/HIER: 2step reduce algorithm originally implemented by V.Petrov --- src/components/cl/hier/Makefile.am | 8 +- src/components/cl/hier/cl_hier.c | 5 + src/components/cl/hier/cl_hier.h | 1 + src/components/cl/hier/cl_hier_coll.c | 49 ++- src/components/cl/hier/cl_hier_coll.h | 5 +- src/components/cl/hier/reduce/reduce.c | 17 + src/components/cl/hier/reduce/reduce.h | 38 +++ src/components/cl/hier/reduce/reduce_2step.c | 315 +++++++++++++++++++ src/components/tl/ucp/Makefile.am | 2 +- 9 files changed, 419 insertions(+), 21 deletions(-) create mode 100644 src/components/cl/hier/reduce/reduce.c create mode 100644 src/components/cl/hier/reduce/reduce.h create mode 100644 src/components/cl/hier/reduce/reduce_2step.c diff --git a/src/components/cl/hier/Makefile.am b/src/components/cl/hier/Makefile.am index c99d72e96f..243f5811e8 100644 --- a/src/components/cl/hier/Makefile.am +++ b/src/components/cl/hier/Makefile.am @@ -25,6 +25,11 @@ bcast = \ bcast/bcast.c \ bcast/bcast_2step.c +reduce = \ + reduce/reduce.h \ + reduce/reduce.c \ + reduce/reduce_2step.c + sources = \ cl_hier.h \ cl_hier.c \ @@ -37,7 +42,8 @@ sources = \ $(alltoallv) \ $(alltoall) \ $(barrier) \ - $(bcast) + $(bcast) \ + $(reduce) module_LTLIBRARIES = libucc_cl_hier.la libucc_cl_hier_la_SOURCES = $(sources) diff --git a/src/components/cl/hier/cl_hier.c b/src/components/cl/hier/cl_hier.c index 87f2b2c370..edbb469d78 100644 --- a/src/components/cl/hier/cl_hier.c +++ b/src/components/cl/hier/cl_hier.c @@ -71,6 +71,11 @@ static ucc_config_field_t ucc_cl_hier_lib_config_table[] = { ucc_offsetof(ucc_cl_hier_lib_config_t, bcast_2step_pipeline), UCC_CONFIG_TYPE_PIPELINE_PARAMS}, + {"REDUCE_2STEP_PIPELINE", "n", + "Pipelining settings for RAB reduce algorithm", + ucc_offsetof(ucc_cl_hier_lib_config_t, reduce_2step_pipeline), + UCC_CONFIG_TYPE_PIPELINE_PARAMS}, + {NULL}}; static ucs_config_field_t ucc_cl_hier_context_config_table[] = { diff --git a/src/components/cl/hier/cl_hier.h b/src/components/cl/hier/cl_hier.h index c2fcf5e245..ef40f33118 100644 --- a/src/components/cl/hier/cl_hier.h +++ b/src/components/cl/hier/cl_hier.h @@ -53,6 +53,7 @@ typedef struct ucc_cl_hier_lib_config { ucc_pipeline_params_t allreduce_split_rail_pipeline; ucc_pipeline_params_t allreduce_rab_pipeline; ucc_pipeline_params_t bcast_2step_pipeline; + ucc_pipeline_params_t reduce_2step_pipeline; } ucc_cl_hier_lib_config_t; typedef struct ucc_cl_hier_context_config { diff --git a/src/components/cl/hier/cl_hier_coll.c b/src/components/cl/hier/cl_hier_coll.c index acdb243ddd..b7fd507843 100644 --- a/src/components/cl/hier/cl_hier_coll.c +++ b/src/components/cl/hier/cl_hier_coll.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -13,7 +13,8 @@ const char * ucc_cl_hier_default_alg_select_str[UCC_CL_HIER_N_DEFAULT_ALG_SELECT_STR] = { UCC_CL_HIER_ALLREDUCE_DEFAULT_ALG_SELECT_STR, - UCC_CL_HIER_BCAST_DEFAULT_ALG_SELECT_STR}; + UCC_CL_HIER_BCAST_DEFAULT_ALG_SELECT_STR, + UCC_CL_HIER_REDUCE_DEFAULT_ALG_SELECT_STR}; ucc_status_t ucc_cl_hier_coll_init(ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, @@ -22,14 +23,16 @@ ucc_status_t ucc_cl_hier_coll_init(ucc_base_coll_args_t *coll_args, switch (coll_args->args.coll_type) { case UCC_COLL_TYPE_ALLREDUCE: return ucc_cl_hier_allreduce_rab_init(coll_args, team, task); - case UCC_COLL_TYPE_BARRIER: - return ucc_cl_hier_barrier_init(coll_args, team, task); case UCC_COLL_TYPE_ALLTOALL: return ucc_cl_hier_alltoall_init(coll_args, team, task); case UCC_COLL_TYPE_ALLTOALLV: return ucc_cl_hier_alltoallv_init(coll_args, team, task); + case UCC_COLL_TYPE_BARRIER: + return ucc_cl_hier_barrier_init(coll_args, team, task); case UCC_COLL_TYPE_BCAST: return ucc_cl_hier_bcast_2step_init(coll_args, team, task); + case UCC_COLL_TYPE_REDUCE: + return ucc_cl_hier_reduce_2step_init(coll_args, team, task); default: cl_error(team->context->lib, "coll_type %s is not supported", ucc_coll_type_str(coll_args->args.coll_type)); @@ -41,14 +44,16 @@ ucc_status_t ucc_cl_hier_coll_init(ucc_base_coll_args_t *coll_args, static inline int alg_id_from_str(ucc_coll_type_t coll_type, const char *str) { switch (coll_type) { + case UCC_COLL_TYPE_ALLREDUCE: + return ucc_cl_hier_allreduce_alg_from_str(str); case UCC_COLL_TYPE_ALLTOALLV: return ucc_cl_hier_alltoallv_alg_from_str(str); case UCC_COLL_TYPE_ALLTOALL: return ucc_cl_hier_alltoall_alg_from_str(str); - case UCC_COLL_TYPE_ALLREDUCE: - return ucc_cl_hier_allreduce_alg_from_str(str); case UCC_COLL_TYPE_BCAST: return ucc_cl_hier_bcast_alg_from_str(str); + case UCC_COLL_TYPE_REDUCE: + return ucc_cl_hier_reduce_alg_from_str(str); default: break; } @@ -66,6 +71,19 @@ ucc_status_t ucc_cl_hier_alg_id_to_init(int alg_id, const char *alg_id_str, } switch (coll_type) { + case UCC_COLL_TYPE_ALLREDUCE: + switch (alg_id) { + case UCC_CL_HIER_ALLREDUCE_ALG_RAB: + *init = ucc_cl_hier_allreduce_rab_init; + break; + case UCC_CL_HIER_ALLREDUCE_ALG_SPLIT_RAIL: + *init = ucc_cl_hier_allreduce_split_rail_init; + break; + default: + status = UCC_ERR_INVALID_PARAM; + break; + }; + break; case UCC_COLL_TYPE_ALLTOALLV: switch (alg_id) { case UCC_CL_HIER_ALLTOALLV_ALG_NODE_SPLIT: @@ -86,28 +104,25 @@ ucc_status_t ucc_cl_hier_alg_id_to_init(int alg_id, const char *alg_id_str, break; }; break; - case UCC_COLL_TYPE_ALLREDUCE: + case UCC_COLL_TYPE_BCAST: switch (alg_id) { - case UCC_CL_HIER_ALLREDUCE_ALG_RAB: - *init = ucc_cl_hier_allreduce_rab_init; - break; - case UCC_CL_HIER_ALLREDUCE_ALG_SPLIT_RAIL: - *init = ucc_cl_hier_allreduce_split_rail_init; + case UCC_CL_HIER_BCAST_ALG_2STEP: + *init = ucc_cl_hier_bcast_2step_init; break; default: status = UCC_ERR_INVALID_PARAM; break; }; break; - case UCC_COLL_TYPE_BCAST: - switch (alg_id) { - case UCC_CL_HIER_BCAST_ALG_2STEP: - *init = ucc_cl_hier_bcast_2step_init; + case UCC_COLL_TYPE_REDUCE: + switch(alg_id) { + case UCC_CL_HIER_REDUCE_ALG_2STEP: + *init = ucc_cl_hier_reduce_2step_init; break; default: status = UCC_ERR_INVALID_PARAM; break; - }; + } break; default: status = UCC_ERR_NOT_SUPPORTED; diff --git a/src/components/cl/hier/cl_hier_coll.h b/src/components/cl/hier/cl_hier_coll.h index 3258796675..5a1e294afe 100644 --- a/src/components/cl/hier/cl_hier_coll.h +++ b/src/components/cl/hier/cl_hier_coll.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -14,8 +14,9 @@ #include "alltoall/alltoall.h" #include "barrier/barrier.h" #include "bcast/bcast.h" +#include "reduce/reduce.h" -#define UCC_CL_HIER_N_DEFAULT_ALG_SELECT_STR 2 +#define UCC_CL_HIER_N_DEFAULT_ALG_SELECT_STR 3 extern const char *ucc_cl_hier_default_alg_select_str[UCC_CL_HIER_N_DEFAULT_ALG_SELECT_STR]; diff --git a/src/components/cl/hier/reduce/reduce.c b/src/components/cl/hier/reduce/reduce.c new file mode 100644 index 0000000000..6ba9e72892 --- /dev/null +++ b/src/components/cl/hier/reduce/reduce.c @@ -0,0 +1,17 @@ +/** + * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "reduce.h" +#include "../reduce/reduce.h" + +ucc_base_coll_alg_info_t + ucc_cl_hier_reduce_algs[UCC_CL_HIER_REDUCE_ALG_LAST + 1] = { + [UCC_CL_HIER_REDUCE_ALG_2STEP] = + {.id = UCC_CL_HIER_REDUCE_ALG_2STEP, + .name = "2step", + .desc = "intra-node and inter-node reduces executed in parallel"}, + [UCC_CL_HIER_REDUCE_ALG_LAST] = { + .id = 0, .name = NULL, .desc = NULL}}; diff --git a/src/components/cl/hier/reduce/reduce.h b/src/components/cl/hier/reduce/reduce.h new file mode 100644 index 0000000000..fdea260996 --- /dev/null +++ b/src/components/cl/hier/reduce/reduce.h @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#ifndef REDUCE_H_ +#define REDUCE_H_ +#include "../cl_hier.h" + +enum +{ + UCC_CL_HIER_REDUCE_ALG_2STEP, + UCC_CL_HIER_REDUCE_ALG_LAST, +}; + +extern ucc_base_coll_alg_info_t + ucc_cl_hier_reduce_algs[UCC_CL_HIER_REDUCE_ALG_LAST + 1]; + +#define UCC_CL_HIER_REDUCE_DEFAULT_ALG_SELECT_STR "reduce:0-4k:@2step" + +ucc_status_t ucc_cl_hier_reduce_2step_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task); + +static inline int ucc_cl_hier_reduce_alg_from_str(const char *str) +{ + int i; + + for (i = 0; i < UCC_CL_HIER_REDUCE_ALG_LAST; i++) { + if (0 == strcasecmp(str, ucc_cl_hier_reduce_algs[i].name)) { + break; + } + } + return i; +} + +#endif diff --git a/src/components/cl/hier/reduce/reduce_2step.c b/src/components/cl/hier/reduce/reduce_2step.c new file mode 100644 index 0000000000..3d737b3238 --- /dev/null +++ b/src/components/cl/hier/reduce/reduce_2step.c @@ -0,0 +1,315 @@ +/** + * Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "reduce.h" +#include "core/ucc_team.h" +#include "../cl_hier_coll.h" + +#define MAX_AR_2STEP_TASKS 3 + +static ucc_status_t ucc_cl_hier_reduce_2step_start(ucc_coll_task_t *task) +{ + UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_reduce_2step_start", 0); + return ucc_schedule_start(task); +} + +static ucc_status_t ucc_cl_hier_reduce_2step_finalize(ucc_coll_task_t *task) +{ + ucc_cl_hier_schedule_t *schedule = ucc_derived_of(task, ucc_cl_hier_schedule_t); + ucc_status_t status; + + UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_reduce_2step_finalize", 0); + status = ucc_schedule_finalize(task); + if (schedule->scratch) { + ucc_mc_free(schedule->scratch); + } + ucc_cl_hier_put_schedule(&schedule->super.super); + return status; +} + +static inline ucc_rank_t +find_root_net_rank(ucc_host_id_t root_host_id, ucc_cl_hier_team_t *cl_team) +{ + ucc_rank_t net_rank = UCC_RANK_INVALID; + ucc_sbgp_t *sbgp = cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp; + ucc_team_t *core_team = cl_team->super.super.params.team; + ucc_rank_t i, rank; + + for (i = 0; i < sbgp->group_size; i++) { + rank = ucc_ep_map_eval(sbgp->map, i); + if (ucc_team_rank_host_id(rank, core_team) == root_host_id) { + net_rank = i; + break; + } + } + return net_rank; +} + +static inline ucc_rank_t +find_root_node_rank(ucc_rank_t root, ucc_cl_hier_team_t *cl_team) +{ + ucc_rank_t node_rank = UCC_RANK_INVALID; + ucc_sbgp_t *sbgp = cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp; + ucc_rank_t i; + + for (i = 0; i < sbgp->group_size; i++) { + if (ucc_ep_map_eval(sbgp->map, i) == root) { + node_rank = i; + break; + } + } + return node_rank; +} + +static ucc_status_t +ucc_cl_hier_reduce_2step_init_schedule(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_schedule_t **sched_p, int n_frags) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t); + ucc_team_t *core_team = team->params.team; + ucc_coll_task_t *tasks[2] = {NULL, NULL}; + ucc_rank_t root = coll_args->args.root; + ucc_rank_t rank = UCC_TL_TEAM_RANK(cl_team); + ucc_base_coll_args_t args = *coll_args; + size_t count = (rank == root) ? + args.args.dst.info.count : + args.args.src.info.count; + ucc_cl_hier_schedule_t *cl_schedule; + ucc_schedule_t *schedule; + ucc_status_t status; + int n_tasks, i, first_task, root_on_local_node; + + root_on_local_node = ucc_team_ranks_on_same_node(root, rank, core_team); + n_tasks = 0; + first_task = 0; + + if (root != rank) { + args.args.dst.info.count = args.args.src.info.count; + args.args.dst.info.mem_type = args.args.src.info.mem_type; + args.args.dst.info.datatype = args.args.src.info.datatype; + args.args.mask &= (~UCC_COLL_ARGS_FLAG_IN_PLACE); + } + + cl_schedule = ucc_cl_hier_get_schedule(cl_team); + if (ucc_unlikely(!cl_schedule)) { + return UCC_ERR_NO_MEMORY; + } + + schedule = &cl_schedule->super.super; + status = ucc_schedule_init(schedule, &args, team); + if (ucc_unlikely(UCC_OK != status)) { + goto out; + } + + args.max_frag_count = ucc_buffer_block_count(count, n_frags, 0); + if (n_frags > 1) { + args.mask |= UCC_BASE_CARGS_MAX_FRAG_COUNT; + } + + ucc_assert(SBGP_ENABLED(cl_team, NODE_LEADERS) || + SBGP_ENABLED(cl_team, NODE)); + if (SBGP_ENABLED(cl_team, NODE)) { + args.args.root = root_on_local_node + ? find_root_node_rank(root, cl_team) : 0; + + if ((root != rank) && SBGP_ENABLED(cl_team, NODE_LEADERS)) { + status = ucc_mc_alloc(&cl_schedule->scratch, args.max_frag_count, + args.args.src.info.mem_type); + if (ucc_unlikely(UCC_OK != status)) { + goto out; + } + args.args.dst.info.buffer = cl_schedule->scratch->addr; + if (root_on_local_node) { + first_task = 1; + args.args.src.info.buffer = cl_schedule->scratch->addr; + } + } + status = ucc_coll_init(SCORE_MAP(cl_team, NODE), &args, &tasks[n_tasks]); + if (ucc_unlikely(UCC_OK != status)) { + goto out; + } + n_tasks++; + } + + if (SBGP_ENABLED(cl_team, NODE_LEADERS)) { + if (n_tasks == 1) { + if (root != rank) { + args.args.src.info.buffer = root_on_local_node ? + coll_args->args.src.info.buffer : cl_schedule->scratch->addr; + } else { + args.args.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + args.args.flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + } + } + args.args.root = find_root_net_rank(ucc_team_rank_host_id(root, core_team), + cl_team); + status = ucc_coll_init(SCORE_MAP(cl_team, NODE_LEADERS), + &args, &tasks[n_tasks]); + if (ucc_unlikely(UCC_OK != status)) { + goto out; + } + n_tasks++; + } + + ucc_task_subscribe_dep(&schedule->super, tasks[first_task], + UCC_EVENT_SCHEDULE_STARTED); + ucc_schedule_add_task(schedule, tasks[first_task]); + + if (n_tasks > 1) { + ucc_task_subscribe_dep(tasks[first_task], + tasks[(first_task + 1) % 2], UCC_EVENT_COMPLETED); + ucc_schedule_add_task(schedule, tasks[(first_task + 1) % 2]); + } + + schedule->super.post = ucc_cl_hier_reduce_2step_start; + schedule->super.progress = NULL; + schedule->super.finalize = ucc_cl_hier_reduce_2step_finalize; + schedule->super.triggered_post = ucc_triggered_post; + *sched_p = schedule; + return UCC_OK; + +out: + for (i = 0; i < n_tasks; i++) { + tasks[i]->finalize(tasks[i]); + } + ucc_cl_hier_put_schedule(schedule); + return status; +} + +static ucc_status_t +ucc_cl_hier_reduce_2step_frag_init(ucc_base_coll_args_t *coll_args, + ucc_schedule_pipelined_t *sp, + ucc_base_team_t *team, + ucc_schedule_t **frag_p) +{ + int n_frags = sp->super.n_tasks; + ucc_status_t status; + + status = ucc_cl_hier_reduce_2step_init_schedule(coll_args, team, frag_p, + n_frags); + return status; +} + +static ucc_status_t +ucc_cl_hier_reduce_2step_frag_setup(ucc_schedule_pipelined_t *schedule_p, + ucc_schedule_t *frag, int frag_num) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(schedule_p->super.super.team, + ucc_cl_hier_team_t); + ucc_coll_args_t *args = &schedule_p->super.super.bargs.args; + size_t dt_size = ucc_dt_size(args->src.info.datatype); + int n_frags = schedule_p->super.n_tasks; + ucc_rank_t root = args->root; + ucc_rank_t rank = UCC_TL_TEAM_RANK(cl_team); + size_t count = (rank == root) ? args->dst.info.count : + args->src.info.count; + size_t frag_count, frag_offset; + ucc_coll_task_t *task; + int i; + ucc_cl_hier_schedule_t *cl_schedule; + void *scratch; + + cl_schedule = ucc_derived_of(frag, ucc_cl_hier_schedule_t); + scratch = cl_schedule->scratch ? cl_schedule->scratch->addr : NULL; + frag_count = ucc_buffer_block_count(count, n_frags, frag_num); + frag_offset = ucc_buffer_block_offset(count, n_frags, frag_num); + + for (i = 0; i < frag->n_tasks; i++) { + task = frag->tasks[i]; + task->bargs.args.src.info.count = frag_count; + task->bargs.args.dst.info.count = frag_count; + if (task->bargs.args.src.info.buffer != scratch) { + task->bargs.args.src.info.buffer = + PTR_OFFSET(args->src.info.buffer, frag_offset * dt_size); + } + if (task->bargs.args.dst.info.buffer != scratch) { + task->bargs.args.dst.info.buffer = + PTR_OFFSET(args->dst.info.buffer, frag_offset * dt_size); + } + } + return UCC_OK; +} + +static ucc_status_t +ucc_cl_hier_reduce_2step_pipelined_start(ucc_coll_task_t *task) +{ + ucc_schedule_pipelined_t *schedule = + ucc_derived_of(task, ucc_schedule_pipelined_t); + + cl_debug(task->team->context->lib, + "posting reduce_2step, count %zd, dt %s" + " pdepth %d, frags_total %d", + task->bargs.args.src.info.count, + ucc_datatype_str(task->bargs.args.src.info.datatype), + schedule->n_frags, schedule->super.n_tasks); + + return ucc_schedule_pipelined_post(task); +} + +static ucc_status_t +ucc_cl_hier_reduce_2step_pipelined_finalize(ucc_coll_task_t *task) +{ + ucc_cl_hier_schedule_t *schedule = ucc_derived_of(task, + ucc_cl_hier_schedule_t); + ucc_status_t status; + + status = ucc_schedule_pipelined_finalize(&schedule->super.super.super); + ucc_cl_hier_put_schedule(&schedule->super.super); + return status; +} + +UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_reduce_2step_init, + (coll_args, team, task), + ucc_base_coll_args_t *coll_args, ucc_base_team_t *team, + ucc_coll_task_t **task) +{ + ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t); + ucc_cl_hier_lib_config_t *cfg = &UCC_CL_HIER_TEAM_LIB(cl_team)->cfg; + ucc_cl_hier_schedule_t *schedule; + int n_frags, pipeline_depth; + ucc_status_t status; + + if (coll_args->args.op == UCC_OP_AVG) { + return UCC_ERR_NOT_SUPPORTED; + } + + ucc_pipeline_nfrags_pdepth(&cfg->reduce_2step_pipeline, + coll_args->args.src.info.count * + ucc_dt_size(coll_args->args.src.info.datatype), + &n_frags, &pipeline_depth); + + if (n_frags == 1) { + return ucc_cl_hier_reduce_2step_init_schedule( + coll_args, team, (ucc_schedule_t **)task, n_frags); + } + + schedule = ucc_cl_hier_get_schedule(cl_team); + if (ucc_unlikely(!schedule)) { + return UCC_ERR_NO_MEMORY; + } + + status = ucc_schedule_pipelined_init( + coll_args, team, ucc_cl_hier_reduce_2step_frag_init, + ucc_cl_hier_reduce_2step_frag_setup, pipeline_depth, n_frags, + cfg->reduce_2step_pipeline.order, &schedule->super); + + if (ucc_unlikely(status != UCC_OK)) { + cl_error(team->context->lib, + "failed to init pipelined 2step ar schedule"); + goto err_pipe_init; + } + + schedule->super.super.super.post = ucc_cl_hier_reduce_2step_pipelined_start; + schedule->super.super.super.finalize = ucc_cl_hier_reduce_2step_pipelined_finalize; + schedule->super.super.super.triggered_post = ucc_triggered_post; + *task = &schedule->super.super.super; + return UCC_OK; + +err_pipe_init: + ucc_cl_hier_put_schedule(&schedule->super.super); + return status; +} diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index 620155d017..6074ed65c8 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -1,5 +1,5 @@ # -# Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # if TL_UCP_ENABLED From 6fecb926acb61c2450c7e26ab65acd6b1bfc4ce8 Mon Sep 17 00:00:00 2001 From: Sergey Lebedev Date: Mon, 19 Feb 2024 17:36:50 +0000 Subject: [PATCH 2/2] TEST: add 2step reduce to gtest --- src/components/cl/hier/reduce/reduce_2step.c | 7 +- test/gtest/coll/test_reduce.cc | 89 +++++++++++--------- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/src/components/cl/hier/reduce/reduce_2step.c b/src/components/cl/hier/reduce/reduce_2step.c index 3d737b3238..bb10434058 100644 --- a/src/components/cl/hier/reduce/reduce_2step.c +++ b/src/components/cl/hier/reduce/reduce_2step.c @@ -117,7 +117,9 @@ ucc_cl_hier_reduce_2step_init_schedule(ucc_base_coll_args_t *coll_args, ? find_root_node_rank(root, cl_team) : 0; if ((root != rank) && SBGP_ENABLED(cl_team, NODE_LEADERS)) { - status = ucc_mc_alloc(&cl_schedule->scratch, args.max_frag_count, + status = ucc_mc_alloc(&cl_schedule->scratch, + args.max_frag_count * + ucc_dt_size(args.args.src.info.datatype), args.args.src.info.mem_type); if (ucc_unlikely(UCC_OK != status)) { goto out; @@ -273,7 +275,8 @@ UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_reduce_2step_init, int n_frags, pipeline_depth; ucc_status_t status; - if (coll_args->args.op == UCC_OP_AVG) { + if (UCC_IS_PERSISTENT(coll_args->args) || + (coll_args->args.op == UCC_OP_AVG)) { return UCC_ERR_NOT_SUPPORTED; } diff --git a/test/gtest/coll/test_reduce.cc b/test/gtest/coll/test_reduce.cc index 0f8bfc034f..2fb1cbc963 100644 --- a/test/gtest/coll/test_reduce.cc +++ b/test/gtest/coll/test_reduce.cc @@ -1,5 +1,5 @@ /** - * Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -283,55 +283,66 @@ template class test_reduce_avg_order : public test_reduce { template class test_reduce_dbt : public test_reduce { }; -#define TEST_DECLARE_WITH_ENV(_env, _n_procs) \ - { \ - UccJob job(_n_procs, UccJob::UCC_JOB_CTX_GLOBAL, _env); \ - UccTeam_h team = job.create_team(_n_procs); \ - int repeat = 3; \ - UccCollCtxVec ctxs; \ - std::vector mt = {UCC_MEMORY_TYPE_HOST}; \ - if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { \ - mt.push_back(UCC_MEMORY_TYPE_CUDA); \ - } \ - if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA_MANAGED)) { \ - mt.push_back(UCC_MEMORY_TYPE_CUDA_MANAGED); \ - } \ - for (auto count : {5, 256, 65536}) { \ - for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { \ - for (auto m : mt) { \ - CHECK_TYPE_OP_SKIP(TypeParam::dt, TypeParam::redop, m); \ - SET_MEM_TYPE(m); \ - this->set_inplace(inplace); \ - this->data_init(_n_procs, TypeParam::dt, count, ctxs, true); \ - UccReq req(team, ctxs); \ - CHECK_REQ_NOT_SUPPORTED_SKIP(req, this->data_fini(ctxs)); \ - for (auto i = 0; i < repeat; i++) { \ - req.start(); \ - req.wait(); \ - EXPECT_EQ(true, this->data_validate(ctxs)); \ - this->reset(ctxs); \ - } \ - this->data_fini(ctxs); \ - } \ - } \ - } \ +template class test_reduce_2step : public test_reduce { +}; + +#define TEST_DECLARE_WITH_ENV(_env, _n_procs, _persistent) \ + { \ + UccJob job(_n_procs, UccJob::UCC_JOB_CTX_GLOBAL, _env); \ + UccTeam_h team = job.create_team(_n_procs); \ + int repeat = _persistent ? 3 : 1; \ + UccCollCtxVec ctxs; \ + std::vector mt = {UCC_MEMORY_TYPE_HOST}; \ + if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA)) { \ + mt.push_back(UCC_MEMORY_TYPE_CUDA); \ + } \ + if (UCC_OK == ucc_mc_available(UCC_MEMORY_TYPE_CUDA_MANAGED)) { \ + mt.push_back(UCC_MEMORY_TYPE_CUDA_MANAGED); \ + } \ + for (auto count : {5, 256, 65536}) { \ + for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) { \ + for (auto m : mt) { \ + CHECK_TYPE_OP_SKIP(TypeParam::dt, TypeParam::redop, m); \ + SET_MEM_TYPE(m); \ + this->set_inplace(inplace); \ + this->data_init(_n_procs, TypeParam::dt, count, ctxs, \ + _persistent); \ + UccReq req(team, ctxs); \ + CHECK_REQ_NOT_SUPPORTED_SKIP(req, this->data_fini(ctxs)); \ + for (auto i = 0; i < repeat; i++) { \ + req.start(); \ + req.wait(); \ + EXPECT_EQ(true, this->data_validate(ctxs)); \ + this->reset(ctxs); \ + } \ + this->data_fini(ctxs); \ + } \ + } \ + } \ } TYPED_TEST_CASE(test_reduce_avg_order, CollReduceTypeOpsAvg); TYPED_TEST_CASE(test_reduce_dbt, CollReduceTypeOpsHost); +TYPED_TEST_CASE(test_reduce_2step, CollReduceTypeOpsHost); -ucc_job_env_t post_op_env = {{"UCC_TL_UCP_REDUCE_AVG_PRE_OP", "0"}}; -ucc_job_env_t reduce_dbt_env = {{"UCC_TL_UCP_TUNE", "reduce:@dbt:0-inf:inf"}, - {"UCC_CLS", "basic"}}; +ucc_job_env_t post_op_env = {{"UCC_TL_UCP_REDUCE_AVG_PRE_OP", "0"}}; +ucc_job_env_t reduce_dbt_env = {{"UCC_TL_UCP_TUNE", "reduce:@dbt:0-inf:inf"}, + {"UCC_CLS", "basic"}}; +ucc_job_env_t reduce_2step_env = {{"UCC_CL_HIER_TUNE", "reduce:@2step:0-inf:inf"}, + {"UCC_CLS", "all"}}; TYPED_TEST(test_reduce_avg_order, avg_post_op) { - TEST_DECLARE_WITH_ENV(post_op_env, 15); + TEST_DECLARE_WITH_ENV(post_op_env, 15, true); } TYPED_TEST(test_reduce_dbt, reduce_dbt_shift) { - TEST_DECLARE_WITH_ENV(reduce_dbt_env, 15); + TEST_DECLARE_WITH_ENV(reduce_dbt_env, 15, true); } TYPED_TEST(test_reduce_dbt, reduce_dbt_mirror) { - TEST_DECLARE_WITH_ENV(reduce_dbt_env, 16); + TEST_DECLARE_WITH_ENV(reduce_dbt_env, 16, true); +} + +TYPED_TEST(test_reduce_2step, 2step) { + TEST_DECLARE_WITH_ENV(reduce_2step_env, 16, false); }