From 2c3aec3b729ff03dc58d473b6255bcf0f10b88b3 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 20 Apr 2024 12:19:35 -0700 Subject: [PATCH 01/14] Start towards core and scheduling policy --- core/threaded/reactor_threaded.c | 17 +++++++++++++++++ include/core/threaded/reactor_threaded.h | 9 +++++++++ low_level_platform/api/low_level_platform.h | 9 +++++++-- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 57f888fc2..3d9e9889e 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -991,6 +991,23 @@ void start_threads(environment_t* env) { if (lf_thread_create(&env->thread_ids[i], worker, env) != 0) { lf_print_error_and_exit("Could not start thread-%u", i); } + // FIXME: Use the target property to set the policy. + lf_scheduling_policy_t policy = { + .priority = 80, // FIXME: determine good priority + .policy = LF_SCHED_PRIORITY + }; + lf_thread_set_scheduling_policy(env->thread_ids[i], &policy); + + int number_of_cores = _LF_NUMBER_OF_CORES; + if (number_of_cores > 0) { + // Pin the thread to cores starting at the highest numbered core. + static int core_number = -1; + if (core_number < 0) core_number = lf_available_cores() - 1; + lf_thread_set_cpu(env->thread_ids[i], core_number); + printf("***** FIXME: core_number %d\n", core_number); + core_number--; + if (core_number < lf_available_cores() - _LF_NUMBER_OF_CORES) core_number = lf_available_cores() - 1; + } } } diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 96de7ac49..08a352b8c 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -88,6 +88,15 @@ void _lf_increment_tag_barrier_locked(environment_t* env, tag_t future_tag); */ void _lf_decrement_tag_barrier_locked(environment_t* env); +/** + * @brief The number of cores to use. + * + * If the target parameter number_of_cores is set, it will override this default. + */ +#ifndef _LF_NUMBER_OF_CORES +#define _LF_NUMBER_OF_CORES 1 +#endif + int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); bool wait_until(instant_t logical_time_ns, lf_cond_t* condition); diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index 2867aa0f4..500efee94 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -170,23 +170,28 @@ int lf_thread_set_cpu(lf_thread_t thread, int cpu_number); * number indicates higher priority. Setting the priority of a thread only * makes sense if the thread is scheduled with LF_SCHED_TIMESLICE or LF_THREAD_PRIORITY * - * @param thread The thread. + * @param thread The thread ID. * @param priority The priority. * @return int 0 on success, platform-specific error otherwise */ int lf_thread_set_priority(lf_thread_t thread, int priority); /** - * @brief Set the scheduling policy of a thread. This is based on the scheduling + * @brief Set the scheduling policy of a thread. + * + * This is based on the scheduling * concept from Linux explained here: https://man7.org/linux/man-pages/man7/sched.7.html * A scheduling policy is specific to a thread/worker. We have three policies * LF_SCHED_PRIORITY which corresponds to SCHED_FIFO on Linux. * LF_SCHED_TIMESLICE which corresponds to SCHED_RR on Linux. * LF_SCHED_FAIR which corresponds to SCHED_OTHER on Linux. * + * @param thread The thread ID. + * @param policy A pointer to the policy (this will not be used after returning, so it can be on the stack). * @return int 0 on success, platform-specific error number otherwise. */ int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy); +// FIXME: The policy pointer argument is worrisome. Can it really be on the stack? /** * Initialize a mutex. From 5665c837838f83863d520041dc3050dad136f49d Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sat, 20 Apr 2024 15:13:39 -0700 Subject: [PATCH 02/14] Support cores target property --- core/CMakeLists.txt | 6 ++++-- core/threaded/reactor_threaded.c | 4 ++-- include/core/threaded/reactor_threaded.h | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index e4a9f1b6c..6c31343ae 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -128,8 +128,9 @@ target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=${INITIAL_ target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=${INITIAL_REACT_QUEUE_SIZE}) target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME}) -# Macro for translating a command-line argument into compile definition for -# reactor-c lib +# If variable X is defined in cMake (set using SET()) or passed in as a command-line +# argument using -DX=, then make it a compiler flag for reactor-c so that X +# is also defined in the C code for reactor-c. macro(define X) if(DEFINED ${X}) message(STATUS ${X}=${${X}}) @@ -159,6 +160,7 @@ define(LOG_LEVEL) define(MODAL_REACTORS) define(NUMBER_OF_FEDERATES) define(NUMBER_OF_WORKERS) +define(LF_NUMBER_OF_CORES) define(NUMBER_OF_WATCHDOGS) define(USER_THREADS) define(SCHEDULER) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 3d9e9889e..d0856b667 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -998,7 +998,7 @@ void start_threads(environment_t* env) { }; lf_thread_set_scheduling_policy(env->thread_ids[i], &policy); - int number_of_cores = _LF_NUMBER_OF_CORES; + int number_of_cores = LF_NUMBER_OF_CORES; if (number_of_cores > 0) { // Pin the thread to cores starting at the highest numbered core. static int core_number = -1; @@ -1006,7 +1006,7 @@ void start_threads(environment_t* env) { lf_thread_set_cpu(env->thread_ids[i], core_number); printf("***** FIXME: core_number %d\n", core_number); core_number--; - if (core_number < lf_available_cores() - _LF_NUMBER_OF_CORES) core_number = lf_available_cores() - 1; + if (core_number < lf_available_cores() - LF_NUMBER_OF_CORES) core_number = lf_available_cores() - 1; } } } diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 08a352b8c..7f0d80fc5 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -93,8 +93,8 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); * * If the target parameter number_of_cores is set, it will override this default. */ -#ifndef _LF_NUMBER_OF_CORES -#define _LF_NUMBER_OF_CORES 1 +#ifndef LF_NUMBER_OF_CORES +#define LF_NUMBER_OF_CORES 0 #endif int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); From 508a156db5f0c6de4fcfef84083b140c188c7b39 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Sun, 21 Apr 2024 09:58:02 -0700 Subject: [PATCH 03/14] Support thread policy in target property --- core/CMakeLists.txt | 3 ++- core/threaded/reactor_threaded.c | 6 ++++-- include/core/threaded/reactor_threaded.h | 12 ++++++++++++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 6c31343ae..5524050ad 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -153,14 +153,15 @@ define(FEDERATED_DECENTRALIZED) define(FEDERATED) define(FEDERATED_AUTHENTICATED) define(FEDERATE_ID) +define(LF_NUMBER_OF_CORES) define(LF_REACTION_GRAPH_BREADTH) +define(LF_THREAD_POLICY) define(LF_TRACE) define(LF_SINGLE_THREADED) define(LOG_LEVEL) define(MODAL_REACTORS) define(NUMBER_OF_FEDERATES) define(NUMBER_OF_WORKERS) -define(LF_NUMBER_OF_CORES) define(NUMBER_OF_WATCHDOGS) define(USER_THREADS) define(SCHEDULER) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index d0856b667..7e20b476f 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -994,17 +994,19 @@ void start_threads(environment_t* env) { // FIXME: Use the target property to set the policy. lf_scheduling_policy_t policy = { .priority = 80, // FIXME: determine good priority - .policy = LF_SCHED_PRIORITY + .policy = LF_THREAD_POLICY }; + LF_PRINT_LOG("Setting thread policy to %d", LF_THREAD_POLICY); lf_thread_set_scheduling_policy(env->thread_ids[i], &policy); int number_of_cores = LF_NUMBER_OF_CORES; + LF_PRINT_LOG("Using %d cores", number_of_cores); if (number_of_cores > 0) { // Pin the thread to cores starting at the highest numbered core. static int core_number = -1; if (core_number < 0) core_number = lf_available_cores() - 1; lf_thread_set_cpu(env->thread_ids[i], core_number); - printf("***** FIXME: core_number %d\n", core_number); + LF_PRINT_LOG("Using core_number %d", core_number); core_number--; if (core_number < lf_available_cores() - LF_NUMBER_OF_CORES) core_number = lf_available_cores() - 1; } diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 7f0d80fc5..a42d5e247 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -97,6 +97,18 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); #define LF_NUMBER_OF_CORES 0 #endif +/** + * @brief The thread scheduling policy to use. + * + * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY. + * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER. + * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds + * to SCHED_FIFO. + */ +#ifndef LF_THREAD_POLICY +#define LF_THREAD_POLICY LF_SCHED_FAIR +#endif + int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); bool wait_until(instant_t logical_time_ns, lf_cond_t* condition); From ac69b82ad0679e0ca99ae6cc7b3709892a8f0f5a Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 24 Apr 2024 16:51:19 -0700 Subject: [PATCH 04/14] Rough start on EDF priority assignment --- core/threaded/reactor_threaded.c | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 7e20b476f..392f6990a 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -867,7 +867,7 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) * @param env Environment within which we are executing. * @param worker_number The number assigned to this worker thread */ -void _lf_worker_do_work(environment_t* env, int worker_number) { +static void _lf_worker_do_work(environment_t* env, int worker_number) { assert(env != GLOBAL_ENVIRONMENT); // Keep track of whether we have decremented the idle thread count. @@ -887,6 +887,25 @@ void _lf_worker_do_work(environment_t* env, int worker_number) { current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id, current_reaction_to_execute->deadline); +#if defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR + // Examine the reaction's (inferred) deadline to set this thread's priority. + interval_t inferred_deadline = (interval_t) (current_reaction_to_execute->index >> 16); + // If there is no deadline, set the priority to 1. + if (inferred_deadline >= (FOREVER >> 16) << 16) { + // All reactions without (inferred) deadlines result in a thread priority of 1. + lf_thread_set_priority(lf_thread_self(), 1); + // FIXME: Remove this thread from the data structure containing assigned priorities. + } else { + // Get the absolute deadline to implement EDF. + instant_t absolute_deadline = (env->current_tag.time + inferred_deadline); + // FIXME: This is the hard part. + // Need to know the priorities of running threads and the absolute deadline associated with it + // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. + lf_critical_section_enter(); + lf_critical_section_exit(); + } +#endif // defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR + bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); if (!violation) { From d0f60c1ec4bc24ee859db50ab4733a9593f496af Mon Sep 17 00:00:00 2001 From: Efsane Soyer Date: Tue, 30 Apr 2024 17:58:39 -0700 Subject: [PATCH 05/14] implementation with LL and some shifting priority algorithm --- core/threaded/reactor_threaded.c | 154 ++++++++++++++++++++++- include/core/threaded/reactor_threaded.h | 12 ++ 2 files changed, 161 insertions(+), 5 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 392f6990a..8b3481da6 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -868,6 +868,12 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) * @param worker_number The number assigned to this worker thread */ static void _lf_worker_do_work(environment_t* env, int worker_number) { + //TODO: add if defined + // environment_t* envs; + // int num_envs = _lf_get_environments(&envs); + // static int a[1]; + // static edf_sched_node a[2][3]; + // edf_sched_node b[num_envs][2]; assert(env != GLOBAL_ENVIRONMENT); // Keep track of whether we have decremented the idle thread count. @@ -886,14 +892,19 @@ static void _lf_worker_do_work(environment_t* env, int worker_number) { worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index), current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id, current_reaction_to_execute->deadline); - -#if defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR +//#if defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR // Examine the reaction's (inferred) deadline to set this thread's priority. interval_t inferred_deadline = (interval_t) (current_reaction_to_execute->index >> 16); // If there is no deadline, set the priority to 1. if (inferred_deadline >= (FOREVER >> 16) << 16) { // All reactions without (inferred) deadlines result in a thread priority of 1. lf_thread_set_priority(lf_thread_self(), 1); + lf_critical_section_enter(GLOBAL_ENVIRONMENT); + if (edf_LL_head) { + lf_thread_t my_id = lf_thread_self(); + remove_from_LL(my_id); + } + lf_critical_section_exit(GLOBAL_ENVIRONMENT); // FIXME: Remove this thread from the data structure containing assigned priorities. } else { // Get the absolute deadline to implement EDF. @@ -901,10 +912,52 @@ static void _lf_worker_do_work(environment_t* env, int worker_number) { // FIXME: This is the hard part. // Need to know the priorities of running threads and the absolute deadline associated with it // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. - lf_critical_section_enter(); - lf_critical_section_exit(); + edf_sched_node* current = &edf_elements[lf_thread_id()]; + lf_critical_section_enter(GLOBAL_ENVIRONMENT); + //if there is no head -- then the current thread is the head + if (!edf_LL_head) { + edf_LL_head = current; + current->abs_d = absolute_deadline; + current->pri = 50; + } else { //if there is a head + lf_thread_t my_id = lf_thread_self(); + remove_from_LL(my_id); + + edf_elements[lf_thread_id()].abs_d = absolute_deadline; + edf_sched_node* ptr = edf_LL_head; + //find the spot that the current thread needs to insert itself + while (ptr) { + //the LL is from lowest priority to the highest priority + if (ptr->abs_d < absolute_deadline) { + //change the pointers to insert the current thread + edf_sched_node* temp = ptr->left; + ptr->left = current; + current->right = ptr; + current->left = temp; + //if the insertion is not at the beginning of the list + if (temp) { + temp->right = current; + if (ptr->pri - temp->pri >= 2) { + //assign somehow + } else { + //shift stuff + shift_priorities(current); + } + } else { //if the insertion is at the beginning of the list + edf_LL_head = current; + //TODO: check if -5 still gives a positive priority + current->pri = ptr->pri - 5; + } + + + } + } + + } + lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); + lf_critical_section_exit(GLOBAL_ENVIRONMENT); } -#endif // defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR +//#endif // defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); @@ -919,6 +972,81 @@ static void _lf_worker_do_work(environment_t* env, int worker_number) { } } +void remove_from_LL(lf_thread_t my_id) { + edf_sched_node* ptr = edf_LL_head; + //if the thread is already on the LL -- remove the old one + while (ptr) { + if (ptr->thread_id == my_id) { + ptr->left->right = ptr->right; + ptr->right->left = ptr->left; + break; + } else { + ptr = ptr->right; + } + } +} + +bool shift_priorities(edf_sched_node* current) { + edf_sched_node* before = current->left; + edf_sched_node* after = current->right; + edf_sched_node* ptr = after; + //count the number of times the while loop executes, so you can distribute the priority space you find + // across the threads in the linked list + uint counter = 1; + //set to true if we can find a shifting spot + bool flag = false; + while (ptr) { + counter++; + if (!ptr->right && ptr->pri != 99) { + //if until the tail we cannot find a shifting spot, and the last element is not 99 + //this means last element can be shifted + flag = true; + uint diff = 99 - ptr->left->pri; + uint incr = diff / counter + 1; + //change the tail's priority + ptr->pri = 99; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + ptr = ptr->left; + //change the priorities of node from tail to the current thread + while (ptr != current) { + ptr->pri+=incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + ptr = ptr->left; + } + //change the priority of the current thread + current->pri = current->right->pri-incr; + lf_thread_set_priority(current->thread_id, current->pri); + break; + } else if (ptr->right) { + uint diff = ptr->right->pri - ptr->pri; + if (diff > 1) { + //eureka! we found shifting spot + flag = true; + //calculate the increment interval + uint incr = diff / counter + 1; + //shift every node's priority from this spot to the spot we inserted the current thread + while (ptr != current) { + ptr->pri+=incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + ptr = ptr->left; + } + //assign the current thread's priority + current->pri = current->right->pri-incr; + lf_thread_set_priority(current->thread_id, current->pri); + //since we found a spot and shifted priorities, we don't need to continue with the while loop + break; + } + } + ptr = ptr->right; + } + if (!flag) { + // if flag is still false, this mean we couldn't find any shifting spot on the right side + //we need to check left + } + return flag; + +} + /** * Worker thread for the thread pool. Its argument is the environment within which is working * The very first worker per environment/enclave is in charge of synchronizing with @@ -930,6 +1058,11 @@ static void _lf_worker_do_work(environment_t* env, int worker_number) { void* worker(void* arg) { initialize_lf_thread_id(); environment_t* env = (environment_t*)arg; + edf_elements[lf_thread_id()].abs_d = FOREVER; + edf_elements[lf_thread_id()].pri = 1; + edf_elements[lf_thread_id()].thread_id = lf_thread_self(); + edf_elements[lf_thread_id()].left = NULL; + edf_elements[lf_thread_id()].right = NULL; LF_MUTEX_LOCK(&env->mutex); int worker_number = env->worker_thread_count++; @@ -1126,6 +1259,16 @@ int lf_reactor_c_main(int argc, const char* argv[]) { environment_t* envs; int num_envs = _lf_get_environments(&envs); + //TODO: if defined + environment_t* env_0 = &envs[0]; + edf_elements = (edf_sched_node *) malloc(sizeof(edf_sched_node) * num_envs * env_0->num_workers); + edf_elements[lf_thread_id()].abs_d = FOREVER; + edf_elements[lf_thread_id()].pri = 1; + edf_elements[lf_thread_id()].thread_id = lf_thread_self(); + edf_elements[lf_thread_id()].left = NULL; + edf_elements[lf_thread_id()].right = NULL; + + #if defined LF_ENCLAVES initialize_local_rti(envs, num_envs); #endif @@ -1161,6 +1304,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // Unlock mutex and allow threads proceed LF_MUTEX_UNLOCK(&env->mutex); } + for (int i = 0; i < num_envs; i++) { // Wait for the worker threads to exit. diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index a42d5e247..7ddbe86e2 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -12,6 +12,18 @@ #include "lf_types.h" + +typedef struct edf_sched_node { + instant_t abs_d; + uint pri; + edf_sched_node* left; + edf_sched_node* right; + lf_thread_t thread_id; +} edf_sched_node; + +edf_sched_node* edf_elements = NULL; +edf_sched_node* edf_LL_head = NULL; +// edf_sched_node* edf_LL_tail = NULL; /** * @brief Advance to the next level. * For federated runtimes, this function should From b407667b6e159830717a8c71531ac1863c3ea05d Mon Sep 17 00:00:00 2001 From: fra-p Date: Thu, 23 May 2024 17:55:51 -0700 Subject: [PATCH 06/14] Initial implementation of EDF scheduler 1. Implemented left-shifting of priorities when the right shift fails 2. Assigned maximum priority to worker threads waiting on the semaphone to be woken up as soon as a thread signals on the semaphore 3. Slightly re-factored the EDF code 4. Added missing case of scheduling a reaction with shorter deadline than the others currently being executed 5. Added print of error when the scheduling policy is not LF_SCHED_FAIR but the program was not launched with sudo rights (cannot change the scheduling policy) --- core/threaded/reactor_threaded.c | 433 +++++++++++++++-------- core/threaded/scheduler_GEDF_NP.c | 3 + include/core/threaded/reactor_threaded.h | 7 +- 3 files changed, 296 insertions(+), 147 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 8b3481da6..8ddf3f56c 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -47,6 +47,9 @@ extern instant_t start_time; */ lf_mutex_t global_mutex; +edf_sched_node* edf_elements = NULL; +edf_sched_node* edf_ll_head = NULL; + void _lf_increment_tag_barrier_locked(environment_t* env, tag_t future_tag) { assert(env != GLOBAL_ENVIRONMENT); @@ -861,124 +864,32 @@ void try_advance_level(environment_t* env, volatile size_t* next_reaction_level) } /** - * The main looping logic of each LF worker thread. - * This function assumes the caller holds the mutex lock. - * - * @param env Environment within which we are executing. - * @param worker_number The number assigned to this worker thread - */ -static void _lf_worker_do_work(environment_t* env, int worker_number) { - //TODO: add if defined - // environment_t* envs; - // int num_envs = _lf_get_environments(&envs); - // static int a[1]; - // static edf_sched_node a[2][3]; - // edf_sched_node b[num_envs][2]; - assert(env != GLOBAL_ENVIRONMENT); - - // Keep track of whether we have decremented the idle thread count. - // Obtain a reaction from the scheduler that is ready to execute - // (i.e., it is not blocked by concurrently executing reactions - // that it depends on). - // lf_print_snapshot(); // This is quite verbose (but very useful in debugging reaction deadlocks). - reaction_t* current_reaction_to_execute = NULL; -#ifdef FEDERATED - lf_stall_advance_level_federation(env, 0); -#endif - while ((current_reaction_to_execute = lf_sched_get_ready_reaction(env->scheduler, worker_number)) != NULL) { - // Got a reaction that is ready to run. - LF_PRINT_DEBUG("Worker %d: Got from scheduler reaction %s: " - "level: %lld, is input reaction: %d, chain ID: %llu, and deadline " PRINTF_TIME ".", - worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index), - current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id, - current_reaction_to_execute->deadline); -//#if defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR - // Examine the reaction's (inferred) deadline to set this thread's priority. - interval_t inferred_deadline = (interval_t) (current_reaction_to_execute->index >> 16); - // If there is no deadline, set the priority to 1. - if (inferred_deadline >= (FOREVER >> 16) << 16) { - // All reactions without (inferred) deadlines result in a thread priority of 1. - lf_thread_set_priority(lf_thread_self(), 1); - lf_critical_section_enter(GLOBAL_ENVIRONMENT); - if (edf_LL_head) { - lf_thread_t my_id = lf_thread_self(); - remove_from_LL(my_id); + * Function to remove the edf_sched_node element indexed + * by my_id from the LL and reset deadline and priority + * @param my_id LF thread ID of the element to remove from the LL +*/ +void remove_from_edf_ll(lf_thread_t my_id) { + LF_PRINT_LOG("Removing thread %ld from the EDF LL.", my_id); + edf_sched_node* ptr = edf_ll_head; + // if the thread is already on the LL -- remove the old one + while (ptr) { + if (ptr->thread_id == my_id) { + if (ptr->left) { + ptr->left->right = ptr->right; } - lf_critical_section_exit(GLOBAL_ENVIRONMENT); - // FIXME: Remove this thread from the data structure containing assigned priorities. - } else { - // Get the absolute deadline to implement EDF. - instant_t absolute_deadline = (env->current_tag.time + inferred_deadline); - // FIXME: This is the hard part. - // Need to know the priorities of running threads and the absolute deadline associated with it - // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. - edf_sched_node* current = &edf_elements[lf_thread_id()]; - lf_critical_section_enter(GLOBAL_ENVIRONMENT); - //if there is no head -- then the current thread is the head - if (!edf_LL_head) { - edf_LL_head = current; - current->abs_d = absolute_deadline; - current->pri = 50; - } else { //if there is a head - lf_thread_t my_id = lf_thread_self(); - remove_from_LL(my_id); - - edf_elements[lf_thread_id()].abs_d = absolute_deadline; - edf_sched_node* ptr = edf_LL_head; - //find the spot that the current thread needs to insert itself - while (ptr) { - //the LL is from lowest priority to the highest priority - if (ptr->abs_d < absolute_deadline) { - //change the pointers to insert the current thread - edf_sched_node* temp = ptr->left; - ptr->left = current; - current->right = ptr; - current->left = temp; - //if the insertion is not at the beginning of the list - if (temp) { - temp->right = current; - if (ptr->pri - temp->pri >= 2) { - //assign somehow - } else { - //shift stuff - shift_priorities(current); - } - } else { //if the insertion is at the beginning of the list - edf_LL_head = current; - //TODO: check if -5 still gives a positive priority - current->pri = ptr->pri - 5; - } - - - } - } - + if (ptr->right) { + ptr->right->left = ptr->left; } - lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); - lf_critical_section_exit(GLOBAL_ENVIRONMENT); - } -//#endif // defined(LF_THREAD_POLICY) && LF_THREAD_POLICY != LF_SCHED_FAIR - bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); - - if (!violation) { - // Invoke the reaction function. - _lf_worker_invoke_reaction(env, worker_number, current_reaction_to_execute); - } - - LF_PRINT_DEBUG("Worker %d: Done with reaction %s.", worker_number, current_reaction_to_execute->name); + // my node was the head of the LL + if (ptr == edf_ll_head) { + edf_ll_head = ptr->right; + } - lf_sched_done_with_reaction(worker_number, current_reaction_to_execute); - } -} + ptr->abs_d = FOREVER; + ptr->pri = 1; + ptr->left = ptr->right = NULL; -void remove_from_LL(lf_thread_t my_id) { - edf_sched_node* ptr = edf_LL_head; - //if the thread is already on the LL -- remove the old one - while (ptr) { - if (ptr->thread_id == my_id) { - ptr->left->right = ptr->right; - ptr->right->left = ptr->left; break; } else { ptr = ptr->right; @@ -986,65 +897,297 @@ void remove_from_LL(lf_thread_t my_id) { } } -bool shift_priorities(edf_sched_node* current) { +/** + * Function that shifts the priority of elements in the LL to assign + * to current (freshly inserted in the LL) a priority value complying + * with the EDF rule. + * First, the algorithm tries to shift the nodes to the right of the position + * where current has been inserted. If this is not possible (the tail + * of the LL has priority 99 and there is no space to shift the previous + * elements), the function tries to shift the nodes to the left. If + * this again is not possible (the head of the LL has priority 2 and + * there is no space to shift the following elements), the function + * returns false (true otherwise). + * + * @param current: the node to which to assign a priority value after shifting + * + * @return true if the shift (either right or left) was possible and a priority + * value was assigned to current; false otherwise. +*/ +bool shift_edf_priorities(edf_sched_node* current) { edf_sched_node* before = current->left; edf_sched_node* after = current->right; edf_sched_node* ptr = after; - //count the number of times the while loop executes, so you can distribute the priority space you find - // across the threads in the linked list + // count the number of times the while loop executes, so you can distribute + // the priority space you find across the threads in the linked list uint counter = 1; - //set to true if we can find a shifting spot + // set to true if we can find a shifting spot bool flag = false; while (ptr) { counter++; if (!ptr->right && ptr->pri != 99) { - //if until the tail we cannot find a shifting spot, and the last element is not 99 - //this means last element can be shifted + // if until the tail we cannot find a shifting spot, and the tail is not 99 + // this means the tail can be shifted to 99 flag = true; uint diff = 99 - ptr->left->pri; - uint incr = diff / counter + 1; - //change the tail's priority + // the formula is explained in the else-if case + uint incr = (diff - 1) / (counter + 1) + 1; + // change the tail's priority ptr->pri = 99; lf_thread_set_priority(ptr->thread_id, ptr->pri); + + // change the priorities of node from tail to the current thread + // to keep the relative priorities while shifting and system-calling ptr = ptr->left; - //change the priorities of node from tail to the current thread - while (ptr != current) { - ptr->pri+=incr; + do { + ptr->pri = ptr->right->pri - incr; lf_thread_set_priority(ptr->thread_id, ptr->pri); ptr = ptr->left; - } - //change the priority of the current thread - current->pri = current->right->pri-incr; - lf_thread_set_priority(current->thread_id, current->pri); + } while (ptr != current); + break; + } else if (ptr->right) { uint diff = ptr->right->pri - ptr->pri; if (diff > 1) { - //eureka! we found shifting spot + // eureka! we found shifting spot flag = true; - //calculate the increment interval - uint incr = diff / counter + 1; - //shift every node's priority from this spot to the spot we inserted the current thread - while (ptr != current) { - ptr->pri+=incr; + // calculate the increment interval: + // # diff - 1 is the extra space to distribute equally + // (-1 because we guarantee a space of at least 1 between ptr and ptr->right); + // # counter + 1 is the number of spaces between "counter" elements in the LL; + // # +1 replaces the ceiling (not exactly but still ok); + uint incr = (diff - 1) / (counter + 1) + 1; + // shift every node's priority from this spot to the spot + // we inserted the current thread, starting from the right + do { + ptr->pri = ptr->right->pri - incr; lf_thread_set_priority(ptr->thread_id, ptr->pri); ptr = ptr->left; - } - //assign the current thread's priority - current->pri = current->right->pri-incr; - lf_thread_set_priority(current->thread_id, current->pri); - //since we found a spot and shifted priorities, we don't need to continue with the while loop + } while (ptr != current); + + // since we found a spot and shifted priorities, we don't need to continue with the while loop break; } } ptr = ptr->right; } + if (!flag) { // if flag is still false, this mean we couldn't find any shifting spot on the right side - //we need to check left + // we need to check left (this is a copy of the above code with the needed edits... debeatable) + ptr = before; + counter = 1; + while (ptr) { + counter++; + if (!ptr->left && ptr->pri != 2) { + // if until the head we cannot find a shifting spot, and the head is not 2 + // (1 is reserved for reactions w/o deadline) this means the head can be shifted + flag = true; + uint diff = ptr->right->pri - 2; + // usual formula to spread priorities + uint incr = (diff - 1) / (counter + 1) + 1; + // change the head's priority + ptr->pri = 2; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + + // change the priorities of node from head to the current thread + ptr = ptr->right; + do { + ptr->pri = ptr->left->pri + incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + ptr = ptr->right; + } while (ptr != current); + + break; + + } else if (ptr->left) { + uint diff = ptr->pri - ptr->left->pri; + if (diff > 1) { + // eureka! we found shifting spot + flag = true; + // usual formula to spread priorities + uint incr = (diff - 1) / (counter + 1) + 1; + // shift every node's priority from this spot to the spot + // we inserted the current thread, starting from the left + do { + ptr->pri = ptr->left->pri + incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + ptr = ptr->right; + } while (ptr != current); + + // since we found a spot and shifted priorities, we don't need to continue with the while loop + break; + } + } + ptr = ptr->left; + } } return flag; +} + +/** + * Function to assign a priority value to the worker thread serving + * current_reaction_to_execute. The value is determined by EDF. + * @param env the environment within which we are executing. + * @param current_reaction_to_execute the reaction the current worker thread is about to serve. +*/ +void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute) { + LF_PRINT_LOG("Assigning priority to reaction %s", current_reaction_to_execute->name); + + // Examine the reaction's (inferred) deadline to set this thread's priority. + interval_t inferred_deadline = (interval_t) (current_reaction_to_execute->index >> 16); + // If there is no deadline, set the priority to 1. + if (inferred_deadline >= (FOREVER >> 16) << 16) { + LF_PRINT_LOG("Reaction %s has no deadline, setting its priority to 1", current_reaction_to_execute->name); + // All reactions without (inferred) deadlines result in a thread priority of 1. + lf_thread_set_priority(lf_thread_self(), 1); + // assuming the edf_element was not in the linked list anymore + // (removed on the completion of the previous served reaction) + } else { + // Get the absolute deadline to implement EDF. + instant_t absolute_deadline = (env->current_tag.time + inferred_deadline); + LF_PRINT_LOG("Reaction %s has inferred deadline %ld", current_reaction_to_execute->name, absolute_deadline); + // Need to know the priorities of running threads and the absolute deadline associated with it + // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. + edf_sched_node* current = &edf_elements[lf_thread_id()]; + current->abs_d = absolute_deadline; + + lf_critical_section_enter(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("In the CS for reaction %s", current_reaction_to_execute->name); + // if there is no head -- then the current thread is the head + if (!edf_ll_head) { + edf_ll_head = current; + current->pri = 50; + + LF_PRINT_LOG("No worker threads running, reaction %s is the head of the list and " + "gets priority %d", current_reaction_to_execute->name, current->pri); + } else { + // there is a head in the LL + + // assuming the edf_element was not in the linked list anymore + // (removed on the completion of the previous served reaction) + + edf_sched_node* ptr = edf_ll_head; + // find the spot that the current thread needs to insert itself + while (ptr) { + // the LL is from lowest priority to the highest priority + if (ptr->abs_d < absolute_deadline) { + LF_PRINT_LOG("Found a reaction having shorter deadline: %ld", ptr->abs_d); + // change the pointers to insert the current thread + edf_sched_node* temp = ptr->left; + ptr->left = current; + current->right = ptr; + current->left = temp; + // if the insertion is not at the beginning of the list + if (temp) { + LF_PRINT_LOG("Insertion not at the beginning of the list"); + temp->right = current; + if (ptr->pri - temp->pri >= 2) { + // distancing the priority by 3 from the lowest if there is enough space + // (because it's likely that as time passes, newly coming deadlines will be bigger) + uint incr = (ptr->pri - temp->pri >= 4) ? 3 : (ptr->pri - temp->pri - 1); + current->pri = current->left->pri + incr; + LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); + } else { + // shift elements to find a proper priority value + LF_PRINT_LOG("Shifting"); + shift_edf_priorities(current); + } + } else { // if the insertion is at the beginning of the list + LF_PRINT_LOG("Insertion at the beginning of the list"); + edf_ll_head = current; + current->pri = (ptr->pri - 5 >= 2) ? (ptr->pri - 5) : 2; + } + + break; + } else if (ptr->right == NULL) { + // this node needs to be added as the tail of the LL + LF_PRINT_LOG("No reactions having shorter deadline, adding %s as the tail of the LL", current_reaction_to_execute->name); + + // ptr is the current tail of the LL (cannot be null) + current->right = NULL; + current->left = ptr; + ptr->right = current; + + if (99 - ptr->pri >= 2) { + // distancing the priority by 3 from the lowest if there is enough space + // (because it's likely that as time passes, newly coming deadlines will be bigger) + uint incr = (99 - ptr->pri >= 4) ? 3 : (99 - ptr->pri - 1); + current->pri = current->left->pri + incr; + LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); + } else { + // shift elements to find a proper priority value + LF_PRINT_LOG("Shifting (only left)"); + shift_edf_priorities(current); + } + + break; + } + + ptr = ptr->right; + } + + } + // TODO: evaluate the possibility to set the priority outside the cs + lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); + lf_critical_section_exit(GLOBAL_ENVIRONMENT); + } +} + +/** + * The main looping logic of each LF worker thread. + * This function assumes the caller holds the mutex lock. + * + * @param env Environment within which we are executing. + * @param worker_number The number assigned to this worker thread + */ +static void _lf_worker_do_work(environment_t* env, int worker_number) { + //TODO: add if defined + // environment_t* envs; + // int num_envs = _lf_get_environments(&envs); + // static int a[1]; + // static edf_sched_node a[2][3]; + // edf_sched_node b[num_envs][2]; + assert(env != GLOBAL_ENVIRONMENT); + + // Keep track of whether we have decremented the idle thread count. + // Obtain a reaction from the scheduler that is ready to execute + // (i.e., it is not blocked by concurrently executing reactions + // that it depends on). + // lf_print_snapshot(); // This is quite verbose (but very useful in debugging reaction deadlocks). + reaction_t* current_reaction_to_execute = NULL; +#ifdef FEDERATED + lf_stall_advance_level_federation(env, 0); +#endif + while ((current_reaction_to_execute = lf_sched_get_ready_reaction(env->scheduler, worker_number)) != NULL) { + // Got a reaction that is ready to run. + LF_PRINT_DEBUG("Worker %d: Got from scheduler reaction %s: " + "level: %lld, is input reaction: %d, chain ID: %llu, and deadline " PRINTF_TIME ".", + worker_number, current_reaction_to_execute->name, LF_LEVEL(current_reaction_to_execute->index), + current_reaction_to_execute->is_an_input_reaction, current_reaction_to_execute->chain_id, + current_reaction_to_execute->deadline); + + if (LF_THREAD_POLICY > LF_SCHED_FAIR) { + assign_edf_priority(env, current_reaction_to_execute); + } + + bool violation = _lf_worker_handle_violations(env, worker_number, current_reaction_to_execute); + if (!violation) { + // Invoke the reaction function. + _lf_worker_invoke_reaction(env, worker_number, current_reaction_to_execute); + } + + lf_critical_section_enter(GLOBAL_ENVIRONMENT); + lf_thread_t my_id = lf_thread_self(); + remove_from_edf_ll(my_id); + lf_critical_section_exit(GLOBAL_ENVIRONMENT); + + LF_PRINT_DEBUG("Worker %d: Done with reaction %s.", worker_number, current_reaction_to_execute->name); + + lf_sched_done_with_reaction(worker_number, current_reaction_to_execute); + } } /** @@ -1149,7 +1292,10 @@ void start_threads(environment_t* env) { .policy = LF_THREAD_POLICY }; LF_PRINT_LOG("Setting thread policy to %d", LF_THREAD_POLICY); - lf_thread_set_scheduling_policy(env->thread_ids[i], &policy); + int ret = lf_thread_set_scheduling_policy(env->thread_ids[i], &policy); + if (ret != 0) { + lf_print_error_and_exit("Couldn't set the scheduling policy. Try running the program with sudo rights."); + } int number_of_cores = LF_NUMBER_OF_CORES; LF_PRINT_LOG("Using %d cores", number_of_cores); @@ -1157,7 +1303,10 @@ void start_threads(environment_t* env) { // Pin the thread to cores starting at the highest numbered core. static int core_number = -1; if (core_number < 0) core_number = lf_available_cores() - 1; - lf_thread_set_cpu(env->thread_ids[i], core_number); + ret = lf_thread_set_cpu(env->thread_ids[i], core_number); + if (ret != 0) { + lf_print_error_and_exit("Couldn't bind thread-%u to core %d.", i, core_number); + } LF_PRINT_LOG("Using core_number %d", core_number); core_number--; if (core_number < lf_available_cores() - LF_NUMBER_OF_CORES) core_number = lf_available_cores() - 1; diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index d590adecb..2771914d0 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -167,6 +167,9 @@ void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { LF_PRINT_DEBUG("Scheduler: Worker %zu is trying to acquire the scheduling " "semaphore.", worker_number); + // setting the priority to the maximum to be sure to be + // woken up when new reactions are available + lf_thread_set_priority(lf_thread_self(), 99); lf_semaphore_acquire(scheduler->semaphore); LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number); } diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 7ddbe86e2..7c75e1be4 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -16,14 +16,11 @@ typedef struct edf_sched_node { instant_t abs_d; uint pri; - edf_sched_node* left; - edf_sched_node* right; + struct edf_sched_node* left; + struct edf_sched_node* right; lf_thread_t thread_id; } edf_sched_node; -edf_sched_node* edf_elements = NULL; -edf_sched_node* edf_LL_head = NULL; -// edf_sched_node* edf_LL_tail = NULL; /** * @brief Advance to the next level. * For federated runtimes, this function should From e6af03356ae48832ecc6fe218915d5f16719964f Mon Sep 17 00:00:00 2001 From: fra-p Date: Fri, 24 May 2024 17:09:36 -0700 Subject: [PATCH 07/14] Fixes to the EDF scheduler 1. Implemented abort when the number of threads exceeds the number of priority values 2. Set the priority to 99 when sleeping with wait_until function --- core/threaded/reactor_threaded.c | 82 +++++++++++++++++++------------ core/threaded/scheduler_GEDF_NP.c | 2 +- 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 8ddf3f56c..704066b53 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -245,6 +245,12 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) { return true; } + // setting the priority to the maximum to be sure to be + // woken up as soon as the sleep time terminates (because there + // might be other worker threads from different enclaves having + // higher priority than what the current thread has) + lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY); + // We do the sleep on the cond var so we can be awakened by the // asynchronous scheduling of a physical action. lf_clock_cond_timedwait // returns 0 if it is awakened before the timeout. Hence, we want to run @@ -903,7 +909,7 @@ void remove_from_edf_ll(lf_thread_t my_id) { * with the EDF rule. * First, the algorithm tries to shift the nodes to the right of the position * where current has been inserted. If this is not possible (the tail - * of the LL has priority 99 and there is no space to shift the previous + * of the LL has priority 98 and there is no space to shift the previous * elements), the function tries to shift the nodes to the left. If * this again is not possible (the head of the LL has priority 2 and * there is no space to shift the following elements), the function @@ -920,20 +926,20 @@ bool shift_edf_priorities(edf_sched_node* current) { edf_sched_node* ptr = after; // count the number of times the while loop executes, so you can distribute // the priority space you find across the threads in the linked list - uint counter = 1; + int counter = 1; // set to true if we can find a shifting spot - bool flag = false; + bool shifted = false; while (ptr) { counter++; - if (!ptr->right && ptr->pri != 99) { - // if until the tail we cannot find a shifting spot, and the tail is not 99 - // this means the tail can be shifted to 99 - flag = true; - uint diff = 99 - ptr->left->pri; + if (!ptr->right && ptr->pri != LF_SCHED_MAX_PRIORITY - 1) { + // if until the tail we cannot find a shifting spot, and the tail is not 98 + // this means the tail can be shifted to 98 + shifted = true; + int diff = LF_SCHED_MAX_PRIORITY - 1 - ptr->left->pri; // the formula is explained in the else-if case - uint incr = (diff - 1) / (counter + 1) + 1; + int incr = (diff - 1) / (counter + 1) + 1; // change the tail's priority - ptr->pri = 99; + ptr->pri = LF_SCHED_MAX_PRIORITY - 1; lf_thread_set_priority(ptr->thread_id, ptr->pri); // change the priorities of node from tail to the current thread @@ -948,16 +954,16 @@ bool shift_edf_priorities(edf_sched_node* current) { break; } else if (ptr->right) { - uint diff = ptr->right->pri - ptr->pri; + int diff = ptr->right->pri - ptr->pri; if (diff > 1) { // eureka! we found shifting spot - flag = true; + shifted = true; // calculate the increment interval: // # diff - 1 is the extra space to distribute equally // (-1 because we guarantee a space of at least 1 between ptr and ptr->right); // # counter + 1 is the number of spaces between "counter" elements in the LL; // # +1 replaces the ceiling (not exactly but still ok); - uint incr = (diff - 1) / (counter + 1) + 1; + int incr = (diff - 1) / (counter + 1) + 1; // shift every node's priority from this spot to the spot // we inserted the current thread, starting from the right do { @@ -973,7 +979,7 @@ bool shift_edf_priorities(edf_sched_node* current) { ptr = ptr->right; } - if (!flag) { + if (!shifted) { // if flag is still false, this mean we couldn't find any shifting spot on the right side // we need to check left (this is a copy of the above code with the needed edits... debeatable) ptr = before; @@ -983,10 +989,10 @@ bool shift_edf_priorities(edf_sched_node* current) { if (!ptr->left && ptr->pri != 2) { // if until the head we cannot find a shifting spot, and the head is not 2 // (1 is reserved for reactions w/o deadline) this means the head can be shifted - flag = true; - uint diff = ptr->right->pri - 2; + shifted = true; + int diff = ptr->right->pri - 2; // usual formula to spread priorities - uint incr = (diff - 1) / (counter + 1) + 1; + int incr = (diff - 1) / (counter + 1) + 1; // change the head's priority ptr->pri = 2; lf_thread_set_priority(ptr->thread_id, ptr->pri); @@ -1002,12 +1008,12 @@ bool shift_edf_priorities(edf_sched_node* current) { break; } else if (ptr->left) { - uint diff = ptr->pri - ptr->left->pri; + int diff = ptr->pri - ptr->left->pri; if (diff > 1) { // eureka! we found shifting spot - flag = true; + shifted = true; // usual formula to spread priorities - uint incr = (diff - 1) / (counter + 1) + 1; + int incr = (diff - 1) / (counter + 1) + 1; // shift every node's priority from this spot to the spot // we inserted the current thread, starting from the left do { @@ -1023,12 +1029,17 @@ bool shift_edf_priorities(edf_sched_node* current) { ptr = ptr->left; } } - return flag; + return shifted; } /** - * Function to assign a priority value to the worker thread serving - * current_reaction_to_execute. The value is determined by EDF. + * Function to assign a priority value between 1 and 98 to the worker thread + * serving current_reaction_to_execute. The value is determined by EDF. + * Priority 1 is assigned to worker threads executing reactions without deadline, + * while priority 99 is reserved for worker threads waiting on the semaphore + * (for the scheduler to advance tag) or sleeping with wait_until. The reason + * for the highest priority is to be sure that the awakening of these threads is not + * delayed by the other worker threads executing reactions (even in different enclaves). * @param env the environment within which we are executing. * @param current_reaction_to_execute the reaction the current worker thread is about to serve. */ @@ -1058,6 +1069,7 @@ void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_exe // if there is no head -- then the current thread is the head if (!edf_ll_head) { edf_ll_head = current; + // FIXME: make sure this is an appropriate value current->pri = 50; LF_PRINT_LOG("No worker threads running, reaction %s is the head of the list and " @@ -1066,7 +1078,7 @@ void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_exe // there is a head in the LL // assuming the edf_element was not in the linked list anymore - // (removed on the completion of the previous served reaction) + // (removed at the completion of the previously executed reaction) edf_sched_node* ptr = edf_ll_head; // find the spot that the current thread needs to insert itself @@ -1083,26 +1095,31 @@ void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_exe if (temp) { LF_PRINT_LOG("Insertion not at the beginning of the list"); temp->right = current; + // if there is enough space to assign a priority value between ptr and temp if (ptr->pri - temp->pri >= 2) { // distancing the priority by 3 from the lowest if there is enough space // (because it's likely that as time passes, newly coming deadlines will be bigger) - uint incr = (ptr->pri - temp->pri >= 4) ? 3 : (ptr->pri - temp->pri - 1); + int incr = (ptr->pri - temp->pri >= 4) ? 3 : (ptr->pri - temp->pri - 1); current->pri = current->left->pri + incr; LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); } else { // shift elements to find a proper priority value LF_PRINT_LOG("Shifting"); - shift_edf_priorities(current); + if (!shift_edf_priorities(current)) { + lf_print_error_and_exit("More threads than priority values. Aborting."); + } } } else { // if the insertion is at the beginning of the list LF_PRINT_LOG("Insertion at the beginning of the list"); edf_ll_head = current; + // distancing the priority by 5 from ptr (if there is enough space) current->pri = (ptr->pri - 5 >= 2) ? (ptr->pri - 5) : 2; } break; } else if (ptr->right == NULL) { - // this node needs to be added as the tail of the LL + // this reaction has the earliest deadline in the list => + // it needs to be added as the tail of the LL LF_PRINT_LOG("No reactions having shorter deadline, adding %s as the tail of the LL", current_reaction_to_execute->name); // ptr is the current tail of the LL (cannot be null) @@ -1110,16 +1127,19 @@ void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_exe current->left = ptr; ptr->right = current; - if (99 - ptr->pri >= 2) { + if (LF_SCHED_MAX_PRIORITY - ptr->pri >= 2) { // distancing the priority by 3 from the lowest if there is enough space // (because it's likely that as time passes, newly coming deadlines will be bigger) - uint incr = (99 - ptr->pri >= 4) ? 3 : (99 - ptr->pri - 1); + // the maximum priority is LF_SCHED_MAX_PRIORITY - 1 (on Linux it's 98) + int incr = (LF_SCHED_MAX_PRIORITY - 1 - ptr->pri >= 4) ? 3 : (LF_SCHED_MAX_PRIORITY - 1 - ptr->pri - 1); current->pri = current->left->pri + incr; LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); } else { // shift elements to find a proper priority value LF_PRINT_LOG("Shifting (only left)"); - shift_edf_priorities(current); + if (!shift_edf_priorities(current)) { + lf_print_error_and_exit("More threads than priority values. Aborting."); + } } break; @@ -1129,7 +1149,7 @@ void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_exe } } - // TODO: evaluate the possibility to set the priority outside the cs + lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); lf_critical_section_exit(GLOBAL_ENVIRONMENT); } diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 2771914d0..84ac60f6c 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -169,7 +169,7 @@ void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { worker_number); // setting the priority to the maximum to be sure to be // woken up when new reactions are available - lf_thread_set_priority(lf_thread_self(), 99); + lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY); lf_semaphore_acquire(scheduler->semaphore); LF_PRINT_DEBUG("Scheduler: Worker %zu acquired the scheduling semaphore.", worker_number); } From 77e03afc5ac88d2a4d03f2df4d5debb8c196c030 Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Thu, 30 May 2024 17:45:21 +0200 Subject: [PATCH 08/14] Formatting --- core/threaded/reactor_threaded.c | 4 +- core/threaded/scheduler_GEDF_NP.c | 91 +++++++++++---------- include/core/threaded/reactor_threaded.h | 4 +- include/core/threaded/scheduler.h | 2 +- low_level_platform/api/low_level_platform.h | 4 +- 5 files changed, 53 insertions(+), 52 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index dc6434325..af532c999 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -1110,8 +1110,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { lf_print("Environment %u: ---- Intializing start tag", env->id); _lf_initialize_start_tag(env); - lf_print("Environment %u: ---- Spawning %d workers on %d cores.", - env->id, env->num_workers, LF_NUMBER_OF_CORES); + lf_print("Environment %u: ---- Spawning %d workers on %d cores.", env->id, env->num_workers, LF_NUMBER_OF_CORES); for (int j = 0; j < env->num_workers; j++) { if (i == 0 && j == 0) { @@ -1132,7 +1131,6 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // Unlock mutex and allow threads proceed LF_MUTEX_UNLOCK(&env->mutex); } - // main thread worker (first worker thread of first environment) void* main_thread_exit_status = NULL; diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index d6f89fab3..8dfd0ca8d 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -51,15 +51,15 @@ typedef struct custom_scheduler_data_t { } custom_scheduler_data_t; typedef struct edf_sched_node_t { - instant_t abs_d; - uint32_t pri; - struct edf_sched_node_t* left; - struct edf_sched_node_t* right; - lf_thread_t thread_id; + instant_t abs_d; + uint32_t pri; + struct edf_sched_node_t* left; + struct edf_sched_node_t* right; + lf_thread_t thread_id; } edf_sched_node_t; // Linked list of worker threads sorted by priority. -static edf_sched_node_t* edf_elements = NULL; +static edf_sched_node_t* edf_elements = NULL; static edf_sched_node_t* edf_ll_head = NULL; /////////////////// Scheduler Private API ///////////////////////// @@ -112,7 +112,7 @@ static int advance_tag(lf_scheduler_t* scheduler) { /** * @brief Shift the priority of elements in the linked list (LL) to assign * to current (freshly inserted in the LL) a priority value complying with the EDF rule. - * + * * First, the algorithm tries to shift the nodes to the right of the position * where current has been inserted. If this is not possible (the tail * of the LL has priority 98 and there is no space to shift the previous @@ -120,16 +120,16 @@ static int advance_tag(lf_scheduler_t* scheduler) { * this again is not possible (the head of the LL has priority 2 and * there is no space to shift the following elements), the function * returns false (true otherwise). - * + * * @param current: the node to which to assign a priority value after shifting - * + * * @return true if the shift (either right or left) was possible and a priority * value was assigned to current; false otherwise. -*/ + */ static bool shift_edf_priorities(edf_sched_node_t* current) { edf_sched_node_t* before = current->left; edf_sched_node_t* after = current->right; - edf_sched_node_t* ptr = after; + edf_sched_node_t* ptr = after; // count the number of times the while loop executes, so you can distribute // the priority space you find across the threads in the linked list int counter = 1; @@ -147,7 +147,7 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { // change the tail's priority ptr->pri = LF_SCHED_MAX_PRIORITY - 1; lf_thread_set_priority(ptr->thread_id, ptr->pri); - + // change the priorities of node from tail to the current thread // to keep the relative priorities while shifting and system-calling ptr = ptr->left; @@ -156,7 +156,7 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { lf_thread_set_priority(ptr->thread_id, ptr->pri); ptr = ptr->left; } while (ptr != current); - + break; } else if (ptr->right) { @@ -177,14 +177,14 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { lf_thread_set_priority(ptr->thread_id, ptr->pri); ptr = ptr->left; } while (ptr != current); - + // since we found a spot and shifted priorities, we don't need to continue with the while loop break; } - } + } ptr = ptr->right; } - + if (!shifted) { // if flag is still false, this mean we couldn't find any shifting spot on the right side // we need to check left (this is a copy of the above code with the needed edits... debeatable) @@ -202,7 +202,7 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { // change the head's priority ptr->pri = 2; lf_thread_set_priority(ptr->thread_id, ptr->pri); - + // change the priorities of node from head to the current thread ptr = ptr->right; do { @@ -210,9 +210,9 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { lf_thread_set_priority(ptr->thread_id, ptr->pri); ptr = ptr->right; } while (ptr != current); - + break; - + } else if (ptr->left) { int diff = ptr->pri - ptr->left->pri; if (diff > 1) { @@ -227,38 +227,38 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { lf_thread_set_priority(ptr->thread_id, ptr->pri); ptr = ptr->right; } while (ptr != current); - + // since we found a spot and shifted priorities, we don't need to continue with the while loop break; } - } + } ptr = ptr->left; } } - return shifted; + return shifted; } /** * @brief Assign a priority value between 1 and 98 to the current worker thread. - * + * * The priority is determined by the current_reaction_to_execute using EDF. * Priority 1 is assigned to worker threads executing reactions without a deadline, * while priority 99 is reserved for worker threads waiting for work. The reason * for the highest priority is to be sure that the awakening of these threads is not * delayed by the other worker threads executing reactions (even in different enclaves). - * + * * This function enters a critical section so the mutex lock should not be held on the * environment when this is called. - * + * * @param env The environment within which we are executing. * @param current_reaction_to_execute The reaction the current worker thread is about to serve. * @param worker_number The worker number of the current worker thread. -*/ + */ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute, int worker_number) { LF_PRINT_LOG("Assigning priority to reaction %s", current_reaction_to_execute->name); // Examine the reaction's (inferred) deadline to set this thread's priority. - interval_t inferred_deadline = (interval_t) (current_reaction_to_execute->index >> 16); + interval_t inferred_deadline = (interval_t)(current_reaction_to_execute->index >> 16); // If there is no deadline, set the priority to 1. if (inferred_deadline >= (FOREVER >> 16) << 16) { LF_PRINT_LOG("Reaction %s has no deadline, setting its priority to 1", current_reaction_to_execute->name); @@ -269,7 +269,8 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction } else { // Get the absolute deadline to implement EDF. instant_t absolute_deadline = (env->current_tag.time + inferred_deadline); - LF_PRINT_LOG("Reaction %s has inferred deadline " PRINTF_TIME, current_reaction_to_execute->name, absolute_deadline); + LF_PRINT_LOG("Reaction %s has inferred deadline " PRINTF_TIME, current_reaction_to_execute->name, + absolute_deadline); // Need to know the priorities of running threads and the absolute deadline associated with it // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. edf_sched_node_t* current = &edf_elements[worker_number]; @@ -284,10 +285,11 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction current->pri = 50; LF_PRINT_LOG("No worker threads running, reaction %s is the head of the list and " - "gets priority %d", current_reaction_to_execute->name, current->pri); - } else { + "gets priority %d", + current_reaction_to_execute->name, current->pri); + } else { // there is a head in the LL - + // assuming the edf_element was not in the linked list anymore // (removed at the completion of the previously executed reaction) @@ -331,8 +333,9 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction } else if (ptr->right == NULL) { // this reaction has the earliest deadline in the list => // it needs to be added as the tail of the LL - LF_PRINT_LOG("No reactions having shorter deadline, adding %s as the tail of the LL", current_reaction_to_execute->name); - + LF_PRINT_LOG("No reactions having shorter deadline, adding %s as the tail of the LL", + current_reaction_to_execute->name); + // ptr is the current tail of the LL (cannot be null) current->right = NULL; current->left = ptr; @@ -365,9 +368,9 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction /** * @brief Remove the edf_sched_node_t element indexed by my_id from the LL and reset deadline and priority. * @param my_id LF thread ID of the element to remove from the LL -*/ + */ static void remove_from_edf_ll(lf_thread_t my_id) { - edf_sched_node_t* ptr = edf_ll_head; + edf_sched_node_t* ptr = edf_ll_head; // if the thread is already on the LL -- remove the old one while (ptr) { if (ptr->thread_id == my_id) { @@ -378,7 +381,7 @@ static void remove_from_edf_ll(lf_thread_t my_id) { ptr->right->left = ptr->left; } - // my node was the head of the LL + // my node was the head of the LL if (ptr == edf_ll_head) { edf_ll_head = ptr->right; } @@ -433,7 +436,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* environment_t* top_level_env; int num_envs = _lf_get_environments(&top_level_env); if (top_level_env == env) { - edf_elements = (edf_sched_node_t *) calloc(num_envs * top_level_env->num_workers, sizeof(edf_sched_node_t)); + edf_elements = (edf_sched_node_t*)calloc(num_envs * top_level_env->num_workers, sizeof(edf_sched_node_t)); } LF_PRINT_DEBUG("Scheduler: Initializing with %zu workers", number_of_workers); if (!init_sched_instance(env, &env->scheduler, number_of_workers, params)) { @@ -483,10 +486,8 @@ void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_number) { edf_elements[worker_number].right = NULL; // FIXME: Use the target property to set the policy. - lf_scheduling_policy_t policy = { - .priority = 80, // FIXME: determine good priority - .policy = LF_THREAD_POLICY - }; + lf_scheduling_policy_t policy = {.priority = 80, // FIXME: determine good priority + .policy = LF_THREAD_POLICY}; LF_PRINT_LOG("Setting thread policy to %d", LF_THREAD_POLICY); int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); if (ret != 0) { @@ -496,14 +497,16 @@ void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_number) { #if LF_NUMBER_OF_CORES > 0 // Pin the thread to cores starting at the highest numbered core. static int core_number = -1; - if (core_number < 0) core_number = lf_available_cores() - 1; + if (core_number < 0) + core_number = lf_available_cores() - 1; ret = lf_thread_set_cpu(env->thread_ids[worker_number], core_number); if (ret != 0) { lf_print_error_and_exit("Couldn't bind thread-%u to core %d.", i, core_number); } LF_PRINT_LOG("Using core_number %d", core_number); core_number--; - if (core_number < lf_available_cores() - LF_NUMBER_OF_CORES) core_number = lf_available_cores() - 1; + if (core_number < lf_available_cores() - LF_NUMBER_OF_CORES) + core_number = lf_available_cores() - 1; #endif // LF_NUMBER_OF_CORES > 0 } @@ -540,7 +543,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu } LF_MUTEX_UNLOCK(&scheduler->env->mutex); if (LF_THREAD_POLICY > LF_SCHED_FAIR) { - assign_edf_priority(scheduler->env, reaction_to_return, worker_number); + assign_edf_priority(scheduler->env, reaction_to_return, worker_number); } return reaction_to_return; } else { diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 433ac6127..7706a323b 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -80,7 +80,7 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); /** * @brief The number of cores to use. - * + * * If the target parameter number_of_cores is set, it will override this default. */ #ifndef LF_NUMBER_OF_CORES @@ -89,7 +89,7 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); /** * @brief The thread scheduling policy to use. - * + * * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY. * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER. * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h index 86f81e553..69c0b59b6 100644 --- a/include/core/threaded/scheduler.h +++ b/include/core/threaded/scheduler.h @@ -86,7 +86,7 @@ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reacti /** * @brief Set priorities and core bindings for the specified worker thread, if appropriate. - * + * * @param scheduler The scheduler. * @param thread_index The worker thread index, from 0 to number_of_workers - 1. */ diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index 14a94433f..30756ef7e 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -121,7 +121,7 @@ lf_thread_t lf_thread_self(); /** * @brief Create a new thread and start execution of the function lf_thread * with the specified arguments. - * + * * The new handle is stored in thread_id. * * @return 0 on success, platform-specific error number otherwise. @@ -177,7 +177,7 @@ int lf_thread_set_priority(lf_thread_t thread, int priority); /** * @brief Set the scheduling policy of a thread. - * + * * This is based on the scheduling * concept from Linux explained here: https://man7.org/linux/man-pages/man7/sched.7.html * A scheduling policy is specific to a thread/worker. We have three policies From ea41b753560afcfa502c758b7dc2a79591082a0e Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Thu, 30 May 2024 18:03:15 +0200 Subject: [PATCH 09/14] Suppress unused variable warnings --- core/threaded/scheduler_NP.c | 5 ++++- include/core/threaded/scheduler.h | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 146a2ece8..d246e38db 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -301,7 +301,10 @@ void lf_sched_free(lf_scheduler_t* scheduler) { ///////////////////// Scheduler Worker API (public) ///////////////////////// -void lf_sched_configure_worker(lf_scheduler_t* scheduler, int thread_index) {} +void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_index) { + (void)scheduler; // Suppress compiler warning about unused parameter. + (void)worker_index; // Suppress compiler warning about unused parameter. +} reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { // Iterate until the stop tag is reached or reaction vectors are empty diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h index 69c0b59b6..108658d0c 100644 --- a/include/core/threaded/scheduler.h +++ b/include/core/threaded/scheduler.h @@ -88,8 +88,8 @@ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reacti * @brief Set priorities and core bindings for the specified worker thread, if appropriate. * * @param scheduler The scheduler. - * @param thread_index The worker thread index, from 0 to number_of_workers - 1. + * @param worker_index The worker thread index, from 0 to number_of_workers - 1. */ -void lf_sched_configure_worker(lf_scheduler_t* scheduler, int thread_index); +void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_index); #endif // LF_SCHEDULER_H From ab9d7082bbe4f3ce30e088866cef3b7c51f6b229 Mon Sep 17 00:00:00 2001 From: fra-p Date: Wed, 5 Jun 2024 16:05:35 -0700 Subject: [PATCH 10/14] Fixed GEDF scheduler after refactoring 1. Fixed bug with thread ids not corresponding to the indices of the EDF data structure 2. Fixed bug where initialize_lf_thread_id was called twice in the main thread --- core/threaded/reactor_threaded.c | 5 +- core/threaded/scheduler_GEDF_NP.c | 73 +++++++++++++++--------------- core/threaded/scheduler_NP.c | 5 +- core/threaded/scheduler_adaptive.c | 2 +- include/core/threaded/scheduler.h | 7 +-- 5 files changed, 42 insertions(+), 50 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index af532c999..85a4c95d0 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -909,6 +909,8 @@ static void _lf_worker_do_work(environment_t* env, int worker_number) { */ void* worker(void* arg) { initialize_lf_thread_id(); + lf_sched_configure_worker(); + environment_t* env = (environment_t*)arg; LF_MUTEX_LOCK(&env->mutex); @@ -1019,7 +1021,6 @@ void determine_number_of_workers(void) { * at compile time. */ int lf_reactor_c_main(int argc, const char* argv[]) { - initialize_lf_thread_id(); // Invoke the function that optionally provides default command-line options. lf_set_default_command_line_options(); @@ -1119,13 +1120,11 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // This is important for bare-metal platforms, who can't // afford to have the main thread sit idle. env->thread_ids[0] = lf_thread_self(); - lf_sched_configure_worker(env->scheduler, 0); continue; } if (lf_thread_create(&env->thread_ids[j], worker, env) != 0) { lf_print_error_and_exit("Could not start thread-%u", j); } - lf_sched_configure_worker(env->scheduler, j); } // Unlock mutex and allow threads proceed diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 8dfd0ca8d..8ff6a08d8 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -252,10 +252,9 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { * * @param env The environment within which we are executing. * @param current_reaction_to_execute The reaction the current worker thread is about to serve. - * @param worker_number The worker number of the current worker thread. */ -static void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute, int worker_number) { - LF_PRINT_LOG("Assigning priority to reaction %s", current_reaction_to_execute->name); +static void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute) { + LF_PRINT_LOG("Assigning priority to reaction %s (thread %d, %ld)", current_reaction_to_execute->name, lf_thread_id(), lf_thread_self()); // Examine the reaction's (inferred) deadline to set this thread's priority. interval_t inferred_deadline = (interval_t)(current_reaction_to_execute->index >> 16); @@ -273,7 +272,7 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction absolute_deadline); // Need to know the priorities of running threads and the absolute deadline associated with it // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. - edf_sched_node_t* current = &edf_elements[worker_number]; + edf_sched_node_t* current = &edf_elements[lf_thread_id()]; current->abs_d = absolute_deadline; lf_critical_section_enter(GLOBAL_ENVIRONMENT); @@ -360,7 +359,7 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction ptr = ptr->right; } } - lf_thread_set_priority(lf_thread_self(), edf_elements[worker_number].pri); + lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); lf_critical_section_exit(GLOBAL_ENVIRONMENT); } } @@ -431,6 +430,12 @@ static void advance_level(lf_scheduler_t* scheduler) { void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* params) { assert(env != GLOBAL_ENVIRONMENT); + LF_PRINT_DEBUG("Scheduler: Initializing with %zu workers", number_of_workers); + if (!init_sched_instance(env, &env->scheduler, number_of_workers, params)) { + // Already initialized + return; + } + // Environment 0 (top level) is responsible for allocating the array that stores the // information about worker thread priorities and deadlines. environment_t* top_level_env; @@ -438,11 +443,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* if (top_level_env == env) { edf_elements = (edf_sched_node_t*)calloc(num_envs * top_level_env->num_workers, sizeof(edf_sched_node_t)); } - LF_PRINT_DEBUG("Scheduler: Initializing with %zu workers", number_of_workers); - if (!init_sched_instance(env, &env->scheduler, number_of_workers, params)) { - // Already initialized - return; - } + lf_scheduler_t* scheduler = env->scheduler; scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); @@ -477,36 +478,34 @@ void lf_sched_free(lf_scheduler_t* scheduler) { ///////////////////// Scheduler Worker API (public) ///////////////////////// -void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_number) { - // Set default worker thread properties. - edf_elements[worker_number].abs_d = FOREVER; - edf_elements[worker_number].pri = 1; - edf_elements[worker_number].thread_id = lf_thread_self(); - edf_elements[worker_number].left = NULL; - edf_elements[worker_number].right = NULL; - - // FIXME: Use the target property to set the policy. - lf_scheduling_policy_t policy = {.priority = 80, // FIXME: determine good priority - .policy = LF_THREAD_POLICY}; - LF_PRINT_LOG("Setting thread policy to %d", LF_THREAD_POLICY); - int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); - if (ret != 0) { - lf_print_warning("Couldn't set the scheduling policy. Try running the program with sudo rights."); - } +void lf_sched_configure_worker() { +// Set default worker thread properties. +edf_elements[lf_thread_id()].abs_d = FOREVER; +edf_elements[lf_thread_id()].pri = 1; +edf_elements[lf_thread_id()].thread_id = lf_thread_self(); +edf_elements[lf_thread_id()].left = NULL; +edf_elements[lf_thread_id()].right = NULL; + +// Use the target property to set the policy. +lf_scheduling_policy_t policy = {.priority = 80, // FIXME: determine good priority + .policy = LF_THREAD_POLICY}; +LF_PRINT_LOG("Setting thread policy to %d to thread %d", LF_THREAD_POLICY, lf_thread_id()); +int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); +if (ret != 0) { + lf_print_warning("Couldn't set the scheduling policy. Try running the program with sudo rights."); +} #if LF_NUMBER_OF_CORES > 0 - // Pin the thread to cores starting at the highest numbered core. - static int core_number = -1; - if (core_number < 0) - core_number = lf_available_cores() - 1; - ret = lf_thread_set_cpu(env->thread_ids[worker_number], core_number); + // Pin the thread to cores starting from the highest numbered core using + // the assigned thread id: small thread id => high core number. Still, + // respecting the constraint on the specified number of cores the program can use. + int core_number = lf_available_cores() - 1 - (lf_thread_id() % LF_NUMBER_OF_CORES); + + ret = lf_thread_set_cpu(lf_thread_self(), core_number); if (ret != 0) { - lf_print_error_and_exit("Couldn't bind thread-%u to core %d.", i, core_number); + lf_print_error_and_exit("Couldn't bind thread-%u to core %d.", lf_thread_id(), core_number); } - LF_PRINT_LOG("Using core_number %d", core_number); - core_number--; - if (core_number < lf_available_cores() - LF_NUMBER_OF_CORES) - core_number = lf_available_cores() - 1; + LF_PRINT_LOG("Thread %d using core_number %d", lf_thread_id(), core_number); #endif // LF_NUMBER_OF_CORES > 0 } @@ -543,7 +542,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu } LF_MUTEX_UNLOCK(&scheduler->env->mutex); if (LF_THREAD_POLICY > LF_SCHED_FAIR) { - assign_edf_priority(scheduler->env, reaction_to_return, worker_number); + assign_edf_priority(scheduler->env, reaction_to_return); } return reaction_to_return; } else { diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index d246e38db..3388a1ab0 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -301,10 +301,7 @@ void lf_sched_free(lf_scheduler_t* scheduler) { ///////////////////// Scheduler Worker API (public) ///////////////////////// -void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_index) { - (void)scheduler; // Suppress compiler warning about unused parameter. - (void)worker_index; // Suppress compiler warning about unused parameter. -} +void lf_sched_configure_worker() {} reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { // Iterate until the stop tag is reached or reaction vectors are empty diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index 806359267..7653c0eec 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -693,7 +693,7 @@ void lf_sched_free(lf_scheduler_t* scheduler) { ///////////////////////// Scheduler Worker API /////////////////////////////// -void lf_sched_configure_worker(lf_scheduler_t* scheduler, int thread_index) {} +void lf_sched_configure_worker() {} reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { assert(worker_number >= 0); diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h index 108658d0c..ff5663aed 100644 --- a/include/core/threaded/scheduler.h +++ b/include/core/threaded/scheduler.h @@ -85,11 +85,8 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number); /** - * @brief Set priorities and core bindings for the specified worker thread, if appropriate. - * - * @param scheduler The scheduler. - * @param worker_index The worker thread index, from 0 to number_of_workers - 1. + * @brief Initialize priority and set core binding for the calling worker thread, if appropriate. */ -void lf_sched_configure_worker(lf_scheduler_t* scheduler, int worker_index); +void lf_sched_configure_worker(); #endif // LF_SCHEDULER_H From bdf5d36a6a14a5f8a6317f2566ec232d35096dad Mon Sep 17 00:00:00 2001 From: fra-p Date: Wed, 12 Jun 2024 10:11:01 -0700 Subject: [PATCH 11/14] Fixes to the GEDF scheduler 1. Set the mutex protocol to INHERIT to avoid unbounded blocking time of the worker threads when operating on the EDF data structure 2. Lowered the maximum priority of worker threads to 98 to reserve 99 for watchdogs 3. Fixes to the shifting algorithm 4. More LOG prints for debugging --- core/threaded/reactor_threaded.c | 3 +- core/threaded/scheduler_GEDF_NP.c | 121 ++++++++++++++---- low_level_platform/api/low_level_platform.h | 5 + .../impl/src/lf_POSIX_threads_support.c | 7 + .../impl/src/lf_linux_support.c | 10 ++ 5 files changed, 121 insertions(+), 25 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 85a4c95d0..06aef08cd 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -246,7 +246,8 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) { // woken up as soon as the sleep time terminates (because there // might be other worker threads from different enclaves having // higher priority than what the current thread has) - lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY); + // FIXME: use the same constant defined for the GEDF scheduler + lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY - 1); // We do the sleep on the cond var so we can be awakened by the // asynchronous scheduling of a physical action. lf_clock_cond_timedwait diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index 8ff6a08d8..4ffe6e98a 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -25,6 +25,26 @@ #define NUMBER_OF_WORKERS 1 #endif // NUMBER_OF_WORKERS +#ifndef EDF_INITIAL_PRIO +#define EDF_INITIAL_PRIO 50 +#endif // EDF_INITIAL_PRIO + +#ifndef EDF_PRIO_SPACING +#define EDF_PRIO_SPACING 3 +#endif // EDF_PRIO_SPACING + +#ifndef EDF_MIN_PRIO +#define EDF_MIN_PRIO (LF_SCHED_MIN_PRIORITY + 2) +#endif // EDF_MIN_PRIO + +#ifndef EDF_MAX_PRIO +#define EDF_MAX_PRIO (LF_SCHED_MAX_PRIORITY - 2) +#endif // EDF_MAX_PRIO + +#ifndef EDF_SLEEP_PRIO +#define EDF_SLEEP_PRIO (LF_SCHED_MAX_PRIORITY - 1) +#endif // EDF_SLEEP_PRIO + #include #include // For uint32_t @@ -74,7 +94,7 @@ inline static void wait_for_reaction_queue_updates(lf_scheduler_t* scheduler, in // Set the priority to the maximum to be sure to be woken up when new reactions are available. // FIXME: Should only do this if the number of threads is greater than the number of cores. - lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY); + lf_thread_set_priority(lf_thread_self(), EDF_SLEEP_PRIO); tracepoint_worker_wait_starts(scheduler->env, worker_number); LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); @@ -119,7 +139,10 @@ static int advance_tag(lf_scheduler_t* scheduler) { * elements), the function tries to shift the nodes to the left. If * this again is not possible (the head of the LL has priority 2 and * there is no space to shift the following elements), the function - * returns false (true otherwise). + * returns false (true otherwise). + * + * The calling thread must own the lock of the + * global mutex before calling this function. * * @param current: the node to which to assign a priority value after shifting * @@ -135,34 +158,51 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { int counter = 1; // set to true if we can find a shifting spot bool shifted = false; + LF_PRINT_LOG("EDF: attempting to shift right"); while (ptr) { counter++; - if (!ptr->right && ptr->pri != LF_SCHED_MAX_PRIORITY - 1) { + if (!ptr->right && ptr->pri != EDF_MAX_PRIO) { // if until the tail we cannot find a shifting spot, and the tail is not 98 // this means the tail can be shifted to 98 + LF_PRINT_LOG("EDF: got to the tail that is gonna be shifted to MAX"); shifted = true; - int diff = LF_SCHED_MAX_PRIORITY - 1 - ptr->left->pri; + + int diff = EDF_MAX_PRIO; + // in the case I haven't incremented ptr and immediately + // got to the tail (i.e., current == ptr->left), I need to use + // before-after as priority interval to compute the diff + if (ptr->left != current) { + diff -= ptr->left->pri; + } else { + diff -= ptr->left->left->pri; + } // the formula is explained in the else-if case int incr = (diff - 1) / (counter + 1) + 1; + LF_PRINT_LOG("EDF: shifting increment is %d: diff = %d, counter = %d", incr, diff, counter); + // change the tail's priority - ptr->pri = LF_SCHED_MAX_PRIORITY - 1; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, EDF_MAX_PRIO); + ptr->pri = EDF_MAX_PRIO; + lf_thread_set_priority(ptr->thread_id, ptr->pri); // change the priorities of node from tail to the current thread // to keep the relative priorities while shifting and system-calling - ptr = ptr->left; + LF_PRINT_LOG("EDF: shifting the nodes before tail to make room for the new one"); do { + ptr = ptr->left; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->right->pri - incr); ptr->pri = ptr->right->pri - incr; lf_thread_set_priority(ptr->thread_id, ptr->pri); - ptr = ptr->left; } while (ptr != current); - + LF_PRINT_LOG("EDF: finished shifting"); break; } else if (ptr->right) { int diff = ptr->right->pri - ptr->pri; if (diff > 1) { // eureka! we found shifting spot + LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, ptr->pri, ptr->right->thread_id, ptr->right->pri); shifted = true; // calculate the increment interval: // # diff - 1 is the extra space to distribute equally @@ -172,10 +212,12 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { int incr = (diff - 1) / (counter + 1) + 1; // shift every node's priority from this spot to the spot // we inserted the current thread, starting from the right + ptr = ptr->right; do { + ptr = ptr->left; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->right->pri - incr); ptr->pri = ptr->right->pri - incr; lf_thread_set_priority(ptr->thread_id, ptr->pri); - ptr = ptr->left; } while (ptr != current); // since we found a spot and shifted priorities, we don't need to continue with the while loop @@ -186,29 +228,43 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { } if (!shifted) { - // if flag is still false, this mean we couldn't find any shifting spot on the right side + LF_PRINT_LOG("EDF: right-shift failed, attempting to shift left"); + // if flag is still false, this means we couldn't find any shifting spot on the right side // we need to check left (this is a copy of the above code with the needed edits... debeatable) ptr = before; counter = 1; while (ptr) { counter++; - if (!ptr->left && ptr->pri != 2) { + if (!ptr->left && ptr->pri != EDF_MIN_PRIO) { // if until the head we cannot find a shifting spot, and the head is not 2 // (1 is reserved for reactions w/o deadline) this means the head can be shifted + LF_PRINT_LOG("EDF: got to the head that is gonna be shifted to 2"); shifted = true; - int diff = ptr->right->pri - 2; + + int diff = - EDF_MIN_PRIO; + // in the case I haven't incremented ptr and immediately + // got to the head (i.e., current == ptr->right), I need to use + // before-after as priority interval to compute the diff + if (ptr->left != current) { + diff += ptr->right->pri; + } else { + diff += ptr->right->right->pri; + } + // usual formula to spread priorities int incr = (diff - 1) / (counter + 1) + 1; // change the head's priority - ptr->pri = 2; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, 2); + ptr->pri = EDF_MIN_PRIO; lf_thread_set_priority(ptr->thread_id, ptr->pri); // change the priorities of node from head to the current thread - ptr = ptr->right; + do { + ptr = ptr->right; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr); ptr->pri = ptr->left->pri + incr; lf_thread_set_priority(ptr->thread_id, ptr->pri); - ptr = ptr->right; } while (ptr != current); break; @@ -217,15 +273,19 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { int diff = ptr->pri - ptr->left->pri; if (diff > 1) { // eureka! we found shifting spot + LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, ptr->pri, ptr->left->thread_id, ptr->left->pri); shifted = true; // usual formula to spread priorities int incr = (diff - 1) / (counter + 1) + 1; // shift every node's priority from this spot to the spot // we inserted the current thread, starting from the left + + ptr = ptr->left; do { + ptr = ptr->right; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr); ptr->pri = ptr->left->pri + incr; lf_thread_set_priority(ptr->thread_id, ptr->pri); - ptr = ptr->right; } while (ptr != current); // since we found a spot and shifted priorities, we don't need to continue with the while loop @@ -275,13 +335,13 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction edf_sched_node_t* current = &edf_elements[lf_thread_id()]; current->abs_d = absolute_deadline; + LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); lf_critical_section_enter(GLOBAL_ENVIRONMENT); LF_PRINT_LOG("In the CS for reaction %s", current_reaction_to_execute->name); // if there is no head -- then the current thread is the head if (!edf_ll_head) { edf_ll_head = current; - // FIXME: make sure this is an appropriate value - current->pri = 50; + current->pri = EDF_INITIAL_PRIO; LF_PRINT_LOG("No worker threads running, reaction %s is the head of the list and " "gets priority %d", @@ -311,7 +371,7 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction if (ptr->pri - temp->pri >= 2) { // distancing the priority by 3 from the lowest if there is enough space // (because it's likely that as time passes, newly coming deadlines will be bigger) - int incr = (ptr->pri - temp->pri >= 4) ? 3 : (ptr->pri - temp->pri - 1); + int incr = (ptr->pri - temp->pri >= EDF_PRIO_SPACING + 1) ? EDF_PRIO_SPACING : (ptr->pri - temp->pri - 1); current->pri = current->left->pri + incr; LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); } else { @@ -324,8 +384,16 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction } else { // if the insertion is at the beginning of the list LF_PRINT_LOG("Insertion at the beginning of the list"); edf_ll_head = current; - // distancing the priority by 5 from ptr (if there is enough space) - current->pri = (ptr->pri - 5 >= 2) ? (ptr->pri - 5) : 2; + + if (ptr->pri == EDF_MIN_PRIO) { + // need to shift right + if (!shift_edf_priorities(current)) { + lf_print_error_and_exit("More threads than priority values. Aborting."); + } + } else { + // distancing the priority by 3 from ptr (if there is enough space) + current->pri = (ptr->pri - EDF_PRIO_SPACING >= EDF_MIN_PRIO) ? (ptr->pri - EDF_PRIO_SPACING) : EDF_MIN_PRIO; + } } break; @@ -340,11 +408,11 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction current->left = ptr; ptr->right = current; - if (LF_SCHED_MAX_PRIORITY - ptr->pri >= 2) { + if (EDF_MAX_PRIO - ptr->pri >= 1) { // distancing the priority by 3 from the lowest if there is enough space // (because it's likely that as time passes, newly coming deadlines will be bigger) // the maximum priority is LF_SCHED_MAX_PRIORITY - 1 (on Linux it's 98) - int incr = (LF_SCHED_MAX_PRIORITY - 1 - ptr->pri >= 4) ? 3 : (LF_SCHED_MAX_PRIORITY - 1 - ptr->pri - 1); + int incr = (EDF_MAX_PRIO - ptr->pri >= EDF_PRIO_SPACING + 1) ? EDF_PRIO_SPACING : (EDF_MAX_PRIO - ptr->pri); current->pri = current->left->pri + incr; LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); } else { @@ -360,7 +428,9 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction } } lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); + LF_PRINT_LOG("About to exit the CS for reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); lf_critical_section_exit(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); } } @@ -487,7 +557,7 @@ edf_elements[lf_thread_id()].left = NULL; edf_elements[lf_thread_id()].right = NULL; // Use the target property to set the policy. -lf_scheduling_policy_t policy = {.priority = 80, // FIXME: determine good priority +lf_scheduling_policy_t policy = {.priority = EDF_SLEEP_PRIO, .policy = LF_THREAD_POLICY}; LF_PRINT_LOG("Setting thread policy to %d to thread %d", LF_THREAD_POLICY, lf_thread_id()); int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); @@ -589,10 +659,13 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) { lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued); } + LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", done_reaction->name, lf_thread_get_priority(lf_thread_self())); lf_critical_section_enter(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("In the CS from reaction %s", done_reaction->name); lf_thread_t my_id = lf_thread_self(); remove_from_edf_ll(my_id); lf_critical_section_exit(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", done_reaction->name, lf_thread_get_priority(lf_thread_self())); } void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index 30756ef7e..4d527dc26 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -175,6 +175,11 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number); */ int lf_thread_set_priority(lf_thread_t thread, int priority); +/** + * DEBUGGING + */ +int lf_thread_get_priority(lf_thread_t thread); + /** * @brief Set the scheduling policy of a thread. * diff --git a/low_level_platform/impl/src/lf_POSIX_threads_support.c b/low_level_platform/impl/src/lf_POSIX_threads_support.c index 255f38255..ae5fc5322 100644 --- a/low_level_platform/impl/src/lf_POSIX_threads_support.c +++ b/low_level_platform/impl/src/lf_POSIX_threads_support.c @@ -34,6 +34,13 @@ int lf_mutex_init(lf_mutex_t* mutex) { // of the predicate.” This seems like a bug in the implementation of // pthreads. Maybe it has been fixed? pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + + // Initialize the mutex protocol to INHERIT: + // a thread t1 owning the mutex, when it is preempted by a + // higher-priority thread t2 that tries to get the lock on the + // same mutex, inherits t2's priority. + pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT); + return pthread_mutex_init((pthread_mutex_t*)mutex, &attr); } diff --git a/low_level_platform/impl/src/lf_linux_support.c b/low_level_platform/impl/src/lf_linux_support.c index a8caa3d47..e8e5f464b 100644 --- a/low_level_platform/impl/src/lf_linux_support.c +++ b/low_level_platform/impl/src/lf_linux_support.c @@ -54,6 +54,16 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return pthread_setaffinity_np(thread, sizeof(cpu_set), &cpu_set); } +int lf_thread_get_priority(lf_thread_t thread) { + struct sched_param schedparam; + int policy; + + // Get the current scheduling policy + pthread_getschedparam(thread, &policy, &schedparam); + + return schedparam.sched_priority; +} + int lf_thread_set_priority(lf_thread_t thread, int priority) { int posix_policy, min_pri, max_pri, final_priority, res; struct sched_param schedparam; From 6e2df757ea26e4bc789159b0a13190c1a8456190 Mon Sep 17 00:00:00 2001 From: erlingrj Date: Fri, 27 Sep 2024 14:53:32 -0700 Subject: [PATCH 12/14] Run formatter --- core/threaded/scheduler_GEDF_NP.c | 74 ++++++++++--------- .../impl/src/lf_linux_support.c | 2 +- 2 files changed, 42 insertions(+), 34 deletions(-) diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index ffac159bf..f6b723633 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -139,8 +139,8 @@ static int advance_tag(lf_scheduler_t* scheduler) { * elements), the function tries to shift the nodes to the left. If * this again is not possible (the head of the LL has priority 2 and * there is no space to shift the following elements), the function - * returns false (true otherwise). - * + * returns false (true otherwise). + * * The calling thread must own the lock of the * global mutex before calling this function. * @@ -179,11 +179,11 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { // the formula is explained in the else-if case int incr = (diff - 1) / (counter + 1) + 1; LF_PRINT_LOG("EDF: shifting increment is %d: diff = %d, counter = %d", incr, diff, counter); - + // change the tail's priority LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, EDF_MAX_PRIO); ptr->pri = EDF_MAX_PRIO; - + lf_thread_set_priority(ptr->thread_id, ptr->pri); // change the priorities of node from tail to the current thread @@ -202,7 +202,8 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { int diff = ptr->right->pri - ptr->pri; if (diff > 1) { // eureka! we found shifting spot - LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, ptr->pri, ptr->right->thread_id, ptr->right->pri); + LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, + ptr->pri, ptr->right->thread_id, ptr->right->pri); shifted = true; // calculate the increment interval: // # diff - 1 is the extra space to distribute equally @@ -241,7 +242,7 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { LF_PRINT_LOG("EDF: got to the head that is gonna be shifted to 2"); shifted = true; - int diff = - EDF_MIN_PRIO; + int diff = -EDF_MIN_PRIO; // in the case I haven't incremented ptr and immediately // got to the head (i.e., current == ptr->right), I need to use // before-after as priority interval to compute the diff @@ -259,7 +260,7 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { lf_thread_set_priority(ptr->thread_id, ptr->pri); // change the priorities of node from head to the current thread - + do { ptr = ptr->right; LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr); @@ -273,13 +274,14 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { int diff = ptr->pri - ptr->left->pri; if (diff > 1) { // eureka! we found shifting spot - LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, ptr->pri, ptr->left->thread_id, ptr->left->pri); + LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, + ptr->pri, ptr->left->thread_id, ptr->left->pri); shifted = true; // usual formula to spread priorities int incr = (diff - 1) / (counter + 1) + 1; // shift every node's priority from this spot to the spot // we inserted the current thread, starting from the left - + ptr = ptr->left; do { ptr = ptr->right; @@ -314,7 +316,8 @@ static bool shift_edf_priorities(edf_sched_node_t* current) { * @param current_reaction_to_execute The reaction the current worker thread is about to serve. */ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute) { - LF_PRINT_LOG("Assigning priority to reaction %s (thread %d, %ld)", current_reaction_to_execute->name, lf_thread_id(), lf_thread_self()); + LF_PRINT_LOG("Assigning priority to reaction %s (thread %d, %ld)", current_reaction_to_execute->name, lf_thread_id(), + lf_thread_self()); // Examine the reaction's (inferred) deadline to set this thread's priority. interval_t inferred_deadline = (interval_t)(current_reaction_to_execute->index >> 16); @@ -335,7 +338,8 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction edf_sched_node_t* current = &edf_elements[lf_thread_id()]; current->abs_d = absolute_deadline; - LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); + LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", current_reaction_to_execute->name, + lf_thread_get_priority(lf_thread_self())); lf_critical_section_enter(GLOBAL_ENVIRONMENT); LF_PRINT_LOG("In the CS for reaction %s", current_reaction_to_execute->name); // if there is no head -- then the current thread is the head @@ -392,7 +396,8 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction } } else { // distancing the priority by 3 from ptr (if there is enough space) - current->pri = (ptr->pri - EDF_PRIO_SPACING >= EDF_MIN_PRIO) ? (ptr->pri - EDF_PRIO_SPACING) : EDF_MIN_PRIO; + current->pri = + (ptr->pri - EDF_PRIO_SPACING >= EDF_MIN_PRIO) ? (ptr->pri - EDF_PRIO_SPACING) : EDF_MIN_PRIO; } } @@ -428,9 +433,11 @@ static void assign_edf_priority(environment_t* env, reaction_t* current_reaction } } lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); - LF_PRINT_LOG("About to exit the CS for reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); + LF_PRINT_LOG("About to exit the CS for reaction %s with priority %d", current_reaction_to_execute->name, + lf_thread_get_priority(lf_thread_self())); lf_critical_section_exit(GLOBAL_ENVIRONMENT); - LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); + LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", current_reaction_to_execute->name, + lf_thread_get_priority(lf_thread_self())); } } @@ -505,7 +512,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* // Already initialized return; } - + // Environment 0 (top level) is responsible for allocating the array that stores the // information about worker thread priorities and deadlines. environment_t* top_level_env; @@ -513,7 +520,7 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* if (top_level_env == env) { edf_elements = (edf_sched_node_t*)calloc(num_envs * top_level_env->num_workers, sizeof(edf_sched_node_t)); } - + lf_scheduler_t* scheduler = env->scheduler; scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); @@ -549,21 +556,20 @@ void lf_sched_free(lf_scheduler_t* scheduler) { ///////////////////// Scheduler Worker API (public) ///////////////////////// void lf_sched_configure_worker() { -// Set default worker thread properties. -edf_elements[lf_thread_id()].abs_d = FOREVER; -edf_elements[lf_thread_id()].pri = 1; -edf_elements[lf_thread_id()].thread_id = lf_thread_self(); -edf_elements[lf_thread_id()].left = NULL; -edf_elements[lf_thread_id()].right = NULL; - -// Use the target property to set the policy. -lf_scheduling_policy_t policy = {.priority = EDF_SLEEP_PRIO, - .policy = LF_THREAD_POLICY}; -LF_PRINT_LOG("Setting thread policy to %d to thread %d", LF_THREAD_POLICY, lf_thread_id()); -int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); -if (ret != 0) { - lf_print_warning("Couldn't set the scheduling policy. Try running the program with sudo rights."); -} + // Set default worker thread properties. + edf_elements[lf_thread_id()].abs_d = FOREVER; + edf_elements[lf_thread_id()].pri = 1; + edf_elements[lf_thread_id()].thread_id = lf_thread_self(); + edf_elements[lf_thread_id()].left = NULL; + edf_elements[lf_thread_id()].right = NULL; + + // Use the target property to set the policy. + lf_scheduling_policy_t policy = {.priority = EDF_SLEEP_PRIO, .policy = LF_THREAD_POLICY}; + LF_PRINT_LOG("Setting thread policy to %d to thread %d", LF_THREAD_POLICY, lf_thread_id()); + int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); + if (ret != 0) { + lf_print_warning("Couldn't set the scheduling policy. Try running the program with sudo rights."); + } #if LF_NUMBER_OF_CORES > 0 // Pin the thread to cores starting from the highest numbered core using @@ -659,13 +665,15 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) { lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued); } - LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", done_reaction->name, lf_thread_get_priority(lf_thread_self())); + LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", done_reaction->name, + lf_thread_get_priority(lf_thread_self())); lf_critical_section_enter(GLOBAL_ENVIRONMENT); LF_PRINT_LOG("In the CS from reaction %s", done_reaction->name); lf_thread_t my_id = lf_thread_self(); remove_from_edf_ll(my_id); lf_critical_section_exit(GLOBAL_ENVIRONMENT); - LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", done_reaction->name, lf_thread_get_priority(lf_thread_self())); + LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", done_reaction->name, + lf_thread_get_priority(lf_thread_self())); } void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { diff --git a/low_level_platform/impl/src/lf_linux_support.c b/low_level_platform/impl/src/lf_linux_support.c index e8e5f464b..178b81de5 100644 --- a/low_level_platform/impl/src/lf_linux_support.c +++ b/low_level_platform/impl/src/lf_linux_support.c @@ -60,7 +60,7 @@ int lf_thread_get_priority(lf_thread_t thread) { // Get the current scheduling policy pthread_getschedparam(thread, &policy, &schedparam); - + return schedparam.sched_priority; } From caf49ee4a01bafc9b0c5bc5abb812e70317b1bde Mon Sep 17 00:00:00 2001 From: erlingrj Date: Fri, 27 Sep 2024 15:04:18 -0700 Subject: [PATCH 13/14] Move macros to the top --- include/core/threaded/reactor_threaded.h | 42 ++++++++++++------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index b9081c10c..271df23c2 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -12,6 +12,27 @@ #include "lf_types.h" +/** + * @brief The number of cores to use. + * + * If the target parameter number_of_cores is set, it will override this default. + */ +#ifndef LF_NUMBER_OF_CORES +#define LF_NUMBER_OF_CORES 0 +#endif + +/** + * @brief The thread scheduling policy to use. + * + * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY. + * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER. + * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds + * to SCHED_FIFO. + */ +#ifndef LF_THREAD_POLICY +#define LF_THREAD_POLICY LF_SCHED_FAIR +#endif + /** * Enqueue port absent reactions that will send a PORT_ABSENT * message to downstream federates if a given network output port is not present. @@ -78,27 +99,6 @@ void _lf_increment_tag_barrier_locked(environment_t* env, tag_t future_tag); */ void _lf_decrement_tag_barrier_locked(environment_t* env); -/** - * @brief The number of cores to use. - * - * If the target parameter number_of_cores is set, it will override this default. - */ -#ifndef LF_NUMBER_OF_CORES -#define LF_NUMBER_OF_CORES 0 -#endif - -/** - * @brief The thread scheduling policy to use. - * - * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY. - * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER. - * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds - * to SCHED_FIFO. - */ -#ifndef LF_THREAD_POLICY -#define LF_THREAD_POLICY LF_SCHED_FAIR -#endif - int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); From 11deb848f268875c86f908a4acb5c316da6d04ff Mon Sep 17 00:00:00 2001 From: erlingrj Date: Fri, 27 Sep 2024 15:25:09 -0700 Subject: [PATCH 14/14] Address linker errors on various platforms --- core/threaded/reactor_threaded.c | 7 ------- core/threaded/scheduler_GEDF_NP.c | 8 ++++++++ low_level_platform/impl/src/lf_flexpret_support.c | 11 +++++++++++ low_level_platform/impl/src/lf_linux_support.c | 2 ++ low_level_platform/impl/src/lf_macos_support.c | 2 ++ low_level_platform/impl/src/lf_windows_support.c | 2 ++ low_level_platform/impl/src/lf_zephyr_support.c | 3 +++ 7 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index c70802813..7db41bf39 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -205,13 +205,6 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { return true; } - // setting the priority to the maximum to be sure to be - // woken up as soon as the sleep time terminates (because there - // might be other worker threads from different enclaves having - // higher priority than what the current thread has) - // FIXME: use the same constant defined for the GEDF scheduler - lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY - 1); - // We do the sleep on the cond var so we can be awakened by the // asynchronous scheduling of a physical action. lf_clock_cond_timedwait // returns 0 if it is awakened before the timeout. Hence, we want to run diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index f6b723633..cd0886c5b 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -111,6 +111,14 @@ static int advance_tag(lf_scheduler_t* scheduler) { // Set a flag in the scheduler that the lock is held by the sole executing thread. // This prevents acquiring the mutex in lf_scheduler_trigger_reaction. scheduler->custom_data->solo_holds_mutex = true; + + // setting the priority to the maximum to be sure to be + // woken up as soon as the sleep time terminates (because there + // might be other worker threads from different enclaves having + // higher priority than what the current thread has) + // FIXME: Verify that there is no race wrt setting priority to MAX + lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY - 1); + if (_lf_sched_advance_tag_locked(scheduler)) { LF_PRINT_DEBUG("Scheduler: Reached stop tag."); scheduler->should_stop = true; diff --git a/low_level_platform/impl/src/lf_flexpret_support.c b/low_level_platform/impl/src/lf_flexpret_support.c index 7fb6d2a48..0acd37a41 100644 --- a/low_level_platform/impl/src/lf_flexpret_support.c +++ b/low_level_platform/impl/src/lf_flexpret_support.c @@ -226,6 +226,17 @@ void initialize_lf_thread_id() { // Nothing needed here; thread ID's are already available in harware registers // which can be fetched with `read_hartid`. } + +/** + * Real-time scheduling API not implemented for FlexPRET. + */ +int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; } + +int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; } + +int lf_thread_get_priority(lf_thread_t thread) { return -1; } + +int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; } #endif #endif // PLATFORM_FLEXPRET diff --git a/low_level_platform/impl/src/lf_linux_support.c b/low_level_platform/impl/src/lf_linux_support.c index 178b81de5..93b97cd97 100644 --- a/low_level_platform/impl/src/lf_linux_support.c +++ b/low_level_platform/impl/src/lf_linux_support.c @@ -54,6 +54,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return pthread_setaffinity_np(thread, sizeof(cpu_set), &cpu_set); } +// FIXME: This does not do any translation between LF_SCHED priority range and +// the Linux sched priority range. int lf_thread_get_priority(lf_thread_t thread) { struct sched_param schedparam; int policy; diff --git a/low_level_platform/impl/src/lf_macos_support.c b/low_level_platform/impl/src/lf_macos_support.c index bf96362cb..ddc384ee0 100644 --- a/low_level_platform/impl/src/lf_macos_support.c +++ b/low_level_platform/impl/src/lf_macos_support.c @@ -46,6 +46,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; } int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; } +int lf_thread_get_priority(lf_thread_t thread) { return -1; } + int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; } #endif diff --git a/low_level_platform/impl/src/lf_windows_support.c b/low_level_platform/impl/src/lf_windows_support.c index 61424ac7f..083508cf5 100644 --- a/low_level_platform/impl/src/lf_windows_support.c +++ b/low_level_platform/impl/src/lf_windows_support.c @@ -196,6 +196,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; } int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; } +int lf_thread_get_priority(lf_thread_t thread) { return -1; } + int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; } int lf_mutex_init(_lf_critical_section_t* critical_section) { diff --git a/low_level_platform/impl/src/lf_zephyr_support.c b/low_level_platform/impl/src/lf_zephyr_support.c index 74ae9bf90..8370b7a94 100644 --- a/low_level_platform/impl/src/lf_zephyr_support.c +++ b/low_level_platform/impl/src/lf_zephyr_support.c @@ -190,6 +190,9 @@ int lf_thread_set_priority(lf_thread_t thread, int priority) { return 0; } +// FIXME: Implement this. +int lf_thread_get_priority(lf_thread_t thread) { return -1; } + int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { // Update the policy switch (policy->policy) {