Skip to content

Commit

Permalink
Revert "initial commit"
Browse files Browse the repository at this point in the history
This reverts commit ae7a131ec448d5dac71cd93f6ef1325fdf08b09c
  • Loading branch information
Antoine Jego authored and Antoine Jego committed Feb 19, 2025
1 parent a24ae9c commit 401dc22
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 20 deletions.
4 changes: 1 addition & 3 deletions include/starpu_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ enum starpu_data_access_mode
TODO add extended description in documentation.
*/
_STARPU_MPI_REDUX = (1 << 7),
STARPU_MPI_REDUX = (_STARPU_MPI_REDUX|STARPU_RW|STARPU_COMMUTE),
/**< Inter-node reduction only.
STARPU_MPI_REDUX = (1 << 7), /**< Inter-node reduction only.
This is similar to ::STARPU_REDUX, except that
StarPU will allocate a per-node buffer only,
i.e. parallelism will be achieved between
Expand Down
11 changes: 5 additions & 6 deletions mpi/examples/mpi_redux/mpi_redux_autowrapup.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ static struct starpu_codelet work_cl =
.cpu_funcs = { cl_cpu_work },
.nbuffers = 2,
.modes = { STARPU_REDUX, STARPU_R },
.name = "task_work REDUX"
.name = "task_init"
};

static struct starpu_codelet mpi_work_cl =
{
.cpu_funcs = { cl_cpu_work },
.nbuffers = 2,
.modes = { STARPU_MPI_REDUX, STARPU_R },
.name = "task_work MPI_REDUX"
.modes = { STARPU_RW | STARPU_COMMUTE, STARPU_R },
.name = "task_init-mpi"
};

static void cl_cpu_task_init(void *handles[], void*arg)
Expand Down Expand Up @@ -219,9 +219,8 @@ int main(int argc, char *argv[])
tmp1 += 1.0 / (work_node + 1.0);
tmp2 += (nworkers - 1.0)*work_node*i;
}
FPRINTF(stderr, "computed result ---> %f expected %f with access mode %s\n", a,
1.0 + (comm_size - 1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1)*3.0 + tmp1) + tmp2,
i == 0 ? "STARPU_MPI_REDUX" : "STARPU_REDUX");
FPRINTF(stderr, "computed result ---> %f expected %f\n", a,
1.0 + (comm_size - 1.0)*(comm_size)/2.0 + work_coef*nworkers*((comm_size-1)*3.0 + tmp1) + tmp2);
}
starpu_data_unregister(a_h);
for (work_node=0; work_node < comm_size;work_node++)
Expand Down
16 changes: 8 additions & 8 deletions mpi/src/starpu_mpi_task_insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ int _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum s
if (data && data->mpi_data)
{
char *redux_map = starpu_mpi_data_get_redux_map(data);
if (redux_map != NULL && mode & STARPU_R && !(mode & STARPU_REDUX) && !(mode & _STARPU_MPI_REDUX))
if (redux_map != NULL && mode & STARPU_R && mode & ~ STARPU_REDUX && mode & ~ STARPU_MPI_REDUX)
{
_starpu_mpi_redux_wrapup_data(data);
}
}
if ((mode & STARPU_REDUX || mode & _STARPU_MPI_REDUX) && data)
if ((mode & STARPU_REDUX || mode & STARPU_MPI_REDUX) && data)
{
if (exchange_needed)
*exchange_needed = 1;
Expand All @@ -141,7 +141,7 @@ int _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum s
{
STARPU_ASSERT_MSG(starpu_mpi_data_get_rank(data) == STARPU_MPI_PER_NODE, "If task is replicated, it has to access only per-node data");
}
if (data && mode & STARPU_R && !(mode & _STARPU_MPI_REDUX))
if (data && mode & STARPU_R && !(mode & STARPU_MPI_REDUX))
{
int mpi_rank = starpu_mpi_data_get_rank(data);
starpu_mpi_tag_t data_tag = starpu_mpi_data_get_tag(data);
Expand Down Expand Up @@ -210,7 +210,7 @@ int _starpu_mpi_exchange_data_before_execution(starpu_data_handle_t data, enum s

int _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum starpu_data_access_mode mode, int me, int xrank, int do_execute, int prio, MPI_Comm comm)
{
if ((mode & STARPU_REDUX || mode & _STARPU_MPI_REDUX) && data)
if ((mode & STARPU_REDUX || mode & STARPU_MPI_REDUX) && data)
{
struct _starpu_mpi_data *mpi_data = (struct _starpu_mpi_data *) data->mpi_data;
int rrank = starpu_mpi_data_get_rank(data);
Expand Down Expand Up @@ -245,7 +245,7 @@ int _starpu_mpi_exchange_data_after_execution(starpu_data_handle_t data, enum st
}
}
}
if (mode & STARPU_W && !(mode & _STARPU_MPI_REDUX))
if (mode & STARPU_W && !(mode & STARPU_MPI_REDUX))
{
int mpi_rank = starpu_mpi_data_get_rank(data);
starpu_mpi_tag_t data_tag = starpu_mpi_data_get_tag(data);
Expand Down Expand Up @@ -292,7 +292,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
{
if (_starpu_cache_enabled)
{
if ((mode & STARPU_W && !(mode & _STARPU_MPI_REDUX)) || mode & STARPU_REDUX)
if ((mode & STARPU_W && !(mode & STARPU_MPI_REDUX)) || mode & STARPU_REDUX)
{
/* The data has been modified, it MUST be removed from the cache */
starpu_mpi_cached_send_clear(data);
Expand All @@ -302,7 +302,7 @@ void _starpu_mpi_clear_data_after_execution(starpu_data_handle_t data, enum star
else
{
/* We allocated a temporary buffer for the received data, now drop it */
if ((mode & STARPU_R && !(mode & _STARPU_MPI_REDUX)) && do_execute)
if ((mode & STARPU_R && !(mode & STARPU_MPI_REDUX)) && do_execute)
{
int mpi_rank = starpu_mpi_data_get_rank(data);
if (mpi_rank == STARPU_MPI_PER_NODE)
Expand Down Expand Up @@ -343,7 +343,7 @@ int _starpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_nod
{
int arg_type_nocommute = arg_type & ~STARPU_COMMUTE;

if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & _STARPU_MPI_REDUX)
if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & STARPU_MPI_REDUX)
{
starpu_data_handle_t data = va_arg(varg_list_copy, starpu_data_handle_t);
enum starpu_data_access_mode mode = (enum starpu_data_access_mode) arg_type;
Expand Down
2 changes: 1 addition & 1 deletion mpi/src/starpu_mpi_task_insert_fortran.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ int _fstarpu_mpi_task_decode_v(struct starpu_codelet *codelet, int me, int nb_no
int arg_type = (int)(intptr_t)arglist[arg_i];
int arg_type_nocommute = arg_type & ~STARPU_COMMUTE;

if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & _STARPU_MPI_REDUX)
if (arg_type_nocommute & STARPU_R || arg_type_nocommute & STARPU_W || arg_type_nocommute & STARPU_RW || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & STARPU_MPI_REDUX)
{
arg_i++;
starpu_data_handle_t data = arglist[arg_i];
Expand Down
10 changes: 8 additions & 2 deletions src/util/starpu_task_insert_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ void starpu_task_insert_data_process_arg(struct starpu_codelet *cl, struct starp

enum starpu_data_access_mode arg_mode = (enum starpu_data_access_mode) arg_type & ~STARPU_SSEND & ~STARPU_NOFOOTPRINT;

/* MPI_REDUX should be interpreted as RW|COMMUTE by the "ground" StarPU layer.*/
if (arg_mode & STARPU_MPI_REDUX)
{
arg_mode = STARPU_RW|STARPU_COMMUTE;
}
if (cl->nbuffers == STARPU_VARIABLE_NBUFFERS || (cl->nbuffers > STARPU_NMAXBUFS && !cl->dyn_modes))
{
STARPU_TASK_SET_MODE(task, arg_mode,* current_buffer);
Expand Down Expand Up @@ -271,7 +276,7 @@ int _starpu_task_insert_create(struct starpu_codelet *cl, struct starpu_task *ta

while((arg_type = va_arg(varg_list, int)) != 0)
{
if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX)
if (arg_type & STARPU_R || arg_type & STARPU_W || arg_type & STARPU_SCRATCH || arg_type & STARPU_REDUX || arg_type & STARPU_MPI_REDUX)
{
starpu_data_handle_t handle = va_arg(varg_list, starpu_data_handle_t);
starpu_task_insert_data_process_arg(cl, task, &allocated_buffers, &current_buffer, arg_type, handle);
Expand Down Expand Up @@ -678,7 +683,8 @@ int _fstarpu_task_insert_create(struct starpu_codelet *cl, struct starpu_task *t
if (arg_type & STARPU_R
|| arg_type & STARPU_W
|| arg_type & STARPU_SCRATCH
|| arg_type & STARPU_REDUX)
|| arg_type & STARPU_REDUX
|| arg_type & STARPU_MPI_REDUX)
{
arg_i++;
starpu_data_handle_t handle = arglist[arg_i];
Expand Down

0 comments on commit 401dc22

Please sign in to comment.