diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index ca51fd50c..c66142dac 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -166,7 +166,9 @@ define(FEDERATED_DECENTRALIZED) define(FEDERATED) define(FEDERATED_AUTHENTICATED) define(FEDERATE_ID) +define(LF_NUMBER_OF_CORES) define(LF_REACTION_GRAPH_BREADTH) +define(LF_THREAD_POLICY) define(LF_TRACE) define(LF_SINGLE_THREADED) define(LOG_LEVEL) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 392f49c31..06aef08cd 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -242,6 +242,13 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) { return true; } + // setting the priority to the maximum to be sure to be + // woken up as soon as the sleep time terminates (because there + // might be other worker threads from different enclaves having + // higher priority than what the current thread has) + // FIXME: use the same constant defined for the GEDF scheduler + lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY - 1); + // We do the sleep on the cond var so we can be awakened by the // asynchronous scheduling of a physical action. lf_clock_cond_timedwait // returns 0 if it is awakened before the timeout. Hence, we want to run @@ -861,7 +868,7 @@ void _lf_worker_invoke_reaction(environment_t* env, int worker_number, reaction_ * @param env Environment within which we are executing. * @param worker_number The number assigned to this worker thread */ -void _lf_worker_do_work(environment_t* env, int worker_number) { +static void _lf_worker_do_work(environment_t* env, int worker_number) { assert(env != GLOBAL_ENVIRONMENT); // Keep track of whether we have decremented the idle thread count. @@ -903,6 +910,8 @@ void _lf_worker_do_work(environment_t* env, int worker_number) { */ void* worker(void* arg) { initialize_lf_thread_id(); + lf_sched_configure_worker(); + environment_t* env = (environment_t*)arg; LF_MUTEX_LOCK(&env->mutex); @@ -1013,7 +1022,6 @@ void determine_number_of_workers(void) { * at compile time. */ int lf_reactor_c_main(int argc, const char* argv[]) { - initialize_lf_thread_id(); // Invoke the function that optionally provides default command-line options. lf_set_default_command_line_options(); @@ -1104,7 +1112,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { lf_print("Environment %u: ---- Intializing start tag", env->id); _lf_initialize_start_tag(env); - lf_print("Environment %u: ---- Spawning %d workers.", env->id, env->num_workers); + lf_print("Environment %u: ---- Spawning %d workers on %d cores.", env->id, env->num_workers, LF_NUMBER_OF_CORES); for (int j = 0; j < env->num_workers; j++) { if (i == 0 && j == 0) { @@ -1112,6 +1120,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { // run on the main thread, rather than creating a new thread. // This is important for bare-metal platforms, who can't // afford to have the main thread sit idle. + env->thread_ids[0] = lf_thread_self(); continue; } if (lf_thread_create(&env->thread_ids[j], worker, env) != 0) { diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index e77257209..4ffe6e98a 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -3,6 +3,7 @@ * @author Soroush Bateni * @author Edward A. Lee * @author Marten Lohstroh + * @author Francesco Paladino * @copyright (c) 2020-2024, The University of California at Berkeley. * License: BSD 2-clause * @brief Global Earliest Deadline First (GEDF) non-preemptive scheduler for the @@ -24,7 +25,28 @@ #define NUMBER_OF_WORKERS 1 #endif // NUMBER_OF_WORKERS +#ifndef EDF_INITIAL_PRIO +#define EDF_INITIAL_PRIO 50 +#endif // EDF_INITIAL_PRIO + +#ifndef EDF_PRIO_SPACING +#define EDF_PRIO_SPACING 3 +#endif // EDF_PRIO_SPACING + +#ifndef EDF_MIN_PRIO +#define EDF_MIN_PRIO (LF_SCHED_MIN_PRIORITY + 2) +#endif // EDF_MIN_PRIO + +#ifndef EDF_MAX_PRIO +#define EDF_MAX_PRIO (LF_SCHED_MAX_PRIORITY - 2) +#endif // EDF_MAX_PRIO + +#ifndef EDF_SLEEP_PRIO +#define EDF_SLEEP_PRIO (LF_SCHED_MAX_PRIORITY - 1) +#endif // EDF_SLEEP_PRIO + #include +#include // For uint32_t #include "low_level_platform.h" #include "environment.h" @@ -48,6 +70,18 @@ typedef struct custom_scheduler_data_t { bool solo_holds_mutex; // Indicates sole thread holds the mutex. } custom_scheduler_data_t; +typedef struct edf_sched_node_t { + instant_t abs_d; + uint32_t pri; + struct edf_sched_node_t* left; + struct edf_sched_node_t* right; + lf_thread_t thread_id; +} edf_sched_node_t; + +// Linked list of worker threads sorted by priority. +static edf_sched_node_t* edf_elements = NULL; +static edf_sched_node_t* edf_ll_head = NULL; + /////////////////// Scheduler Private API ///////////////////////// /** @@ -57,6 +91,11 @@ typedef struct custom_scheduler_data_t { */ inline static void wait_for_reaction_queue_updates(lf_scheduler_t* scheduler, int worker_number) { scheduler->number_of_idle_workers++; + + // Set the priority to the maximum to be sure to be woken up when new reactions are available. + // FIXME: Should only do this if the number of threads is greater than the number of cores. + lf_thread_set_priority(lf_thread_self(), EDF_SLEEP_PRIO); + tracepoint_worker_wait_starts(scheduler->env, worker_number); LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed); tracepoint_worker_wait_ends(scheduler->env, worker_number); @@ -90,6 +129,343 @@ static int advance_tag(lf_scheduler_t* scheduler) { return 0; } +/** + * @brief Shift the priority of elements in the linked list (LL) to assign + * to current (freshly inserted in the LL) a priority value complying with the EDF rule. + * + * First, the algorithm tries to shift the nodes to the right of the position + * where current has been inserted. If this is not possible (the tail + * of the LL has priority 98 and there is no space to shift the previous + * elements), the function tries to shift the nodes to the left. If + * this again is not possible (the head of the LL has priority 2 and + * there is no space to shift the following elements), the function + * returns false (true otherwise). + * + * The calling thread must own the lock of the + * global mutex before calling this function. + * + * @param current: the node to which to assign a priority value after shifting + * + * @return true if the shift (either right or left) was possible and a priority + * value was assigned to current; false otherwise. + */ +static bool shift_edf_priorities(edf_sched_node_t* current) { + edf_sched_node_t* before = current->left; + edf_sched_node_t* after = current->right; + edf_sched_node_t* ptr = after; + // count the number of times the while loop executes, so you can distribute + // the priority space you find across the threads in the linked list + int counter = 1; + // set to true if we can find a shifting spot + bool shifted = false; + LF_PRINT_LOG("EDF: attempting to shift right"); + while (ptr) { + counter++; + if (!ptr->right && ptr->pri != EDF_MAX_PRIO) { + // if until the tail we cannot find a shifting spot, and the tail is not 98 + // this means the tail can be shifted to 98 + LF_PRINT_LOG("EDF: got to the tail that is gonna be shifted to MAX"); + shifted = true; + + int diff = EDF_MAX_PRIO; + // in the case I haven't incremented ptr and immediately + // got to the tail (i.e., current == ptr->left), I need to use + // before-after as priority interval to compute the diff + if (ptr->left != current) { + diff -= ptr->left->pri; + } else { + diff -= ptr->left->left->pri; + } + // the formula is explained in the else-if case + int incr = (diff - 1) / (counter + 1) + 1; + LF_PRINT_LOG("EDF: shifting increment is %d: diff = %d, counter = %d", incr, diff, counter); + + // change the tail's priority + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, EDF_MAX_PRIO); + ptr->pri = EDF_MAX_PRIO; + + lf_thread_set_priority(ptr->thread_id, ptr->pri); + + // change the priorities of node from tail to the current thread + // to keep the relative priorities while shifting and system-calling + LF_PRINT_LOG("EDF: shifting the nodes before tail to make room for the new one"); + do { + ptr = ptr->left; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->right->pri - incr); + ptr->pri = ptr->right->pri - incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + } while (ptr != current); + LF_PRINT_LOG("EDF: finished shifting"); + break; + + } else if (ptr->right) { + int diff = ptr->right->pri - ptr->pri; + if (diff > 1) { + // eureka! we found shifting spot + LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, ptr->pri, ptr->right->thread_id, ptr->right->pri); + shifted = true; + // calculate the increment interval: + // # diff - 1 is the extra space to distribute equally + // (-1 because we guarantee a space of at least 1 between ptr and ptr->right); + // # counter + 1 is the number of spaces between "counter" elements in the LL; + // # +1 replaces the ceiling (not exactly but still ok); + int incr = (diff - 1) / (counter + 1) + 1; + // shift every node's priority from this spot to the spot + // we inserted the current thread, starting from the right + ptr = ptr->right; + do { + ptr = ptr->left; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->right->pri - incr); + ptr->pri = ptr->right->pri - incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + } while (ptr != current); + + // since we found a spot and shifted priorities, we don't need to continue with the while loop + break; + } + } + ptr = ptr->right; + } + + if (!shifted) { + LF_PRINT_LOG("EDF: right-shift failed, attempting to shift left"); + // if flag is still false, this means we couldn't find any shifting spot on the right side + // we need to check left (this is a copy of the above code with the needed edits... debeatable) + ptr = before; + counter = 1; + while (ptr) { + counter++; + if (!ptr->left && ptr->pri != EDF_MIN_PRIO) { + // if until the head we cannot find a shifting spot, and the head is not 2 + // (1 is reserved for reactions w/o deadline) this means the head can be shifted + LF_PRINT_LOG("EDF: got to the head that is gonna be shifted to 2"); + shifted = true; + + int diff = - EDF_MIN_PRIO; + // in the case I haven't incremented ptr and immediately + // got to the head (i.e., current == ptr->right), I need to use + // before-after as priority interval to compute the diff + if (ptr->left != current) { + diff += ptr->right->pri; + } else { + diff += ptr->right->right->pri; + } + + // usual formula to spread priorities + int incr = (diff - 1) / (counter + 1) + 1; + // change the head's priority + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, 2); + ptr->pri = EDF_MIN_PRIO; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + + // change the priorities of node from head to the current thread + + do { + ptr = ptr->right; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr); + ptr->pri = ptr->left->pri + incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + } while (ptr != current); + + break; + + } else if (ptr->left) { + int diff = ptr->pri - ptr->left->pri; + if (diff > 1) { + // eureka! we found shifting spot + LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id, ptr->pri, ptr->left->thread_id, ptr->left->pri); + shifted = true; + // usual formula to spread priorities + int incr = (diff - 1) / (counter + 1) + 1; + // shift every node's priority from this spot to the spot + // we inserted the current thread, starting from the left + + ptr = ptr->left; + do { + ptr = ptr->right; + LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr); + ptr->pri = ptr->left->pri + incr; + lf_thread_set_priority(ptr->thread_id, ptr->pri); + } while (ptr != current); + + // since we found a spot and shifted priorities, we don't need to continue with the while loop + break; + } + } + ptr = ptr->left; + } + } + return shifted; +} + +/** + * @brief Assign a priority value between 1 and 98 to the current worker thread. + * + * The priority is determined by the current_reaction_to_execute using EDF. + * Priority 1 is assigned to worker threads executing reactions without a deadline, + * while priority 99 is reserved for worker threads waiting for work. The reason + * for the highest priority is to be sure that the awakening of these threads is not + * delayed by the other worker threads executing reactions (even in different enclaves). + * + * This function enters a critical section so the mutex lock should not be held on the + * environment when this is called. + * + * @param env The environment within which we are executing. + * @param current_reaction_to_execute The reaction the current worker thread is about to serve. + */ +static void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute) { + LF_PRINT_LOG("Assigning priority to reaction %s (thread %d, %ld)", current_reaction_to_execute->name, lf_thread_id(), lf_thread_self()); + + // Examine the reaction's (inferred) deadline to set this thread's priority. + interval_t inferred_deadline = (interval_t)(current_reaction_to_execute->index >> 16); + // If there is no deadline, set the priority to 1. + if (inferred_deadline >= (FOREVER >> 16) << 16) { + LF_PRINT_LOG("Reaction %s has no deadline, setting its priority to 1", current_reaction_to_execute->name); + // All reactions without (inferred) deadlines result in a thread priority of 1. + lf_thread_set_priority(lf_thread_self(), 1); + // assuming the edf_element was not in the linked list anymore + // (removed on the completion of the previous served reaction) + } else { + // Get the absolute deadline to implement EDF. + instant_t absolute_deadline = (env->current_tag.time + inferred_deadline); + LF_PRINT_LOG("Reaction %s has inferred deadline " PRINTF_TIME, current_reaction_to_execute->name, + absolute_deadline); + // Need to know the priorities of running threads and the absolute deadline associated with it + // and choose a priority that is in between those with earlier deadlines and those with larger deadlines. + edf_sched_node_t* current = &edf_elements[lf_thread_id()]; + current->abs_d = absolute_deadline; + + LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); + lf_critical_section_enter(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("In the CS for reaction %s", current_reaction_to_execute->name); + // if there is no head -- then the current thread is the head + if (!edf_ll_head) { + edf_ll_head = current; + current->pri = EDF_INITIAL_PRIO; + + LF_PRINT_LOG("No worker threads running, reaction %s is the head of the list and " + "gets priority %d", + current_reaction_to_execute->name, current->pri); + } else { + // there is a head in the LL + + // assuming the edf_element was not in the linked list anymore + // (removed at the completion of the previously executed reaction) + + edf_sched_node_t* ptr = edf_ll_head; + // find the spot that the current thread needs to insert itself + while (ptr) { + // the LL is from lowest priority to the highest priority + if (ptr->abs_d < absolute_deadline) { + LF_PRINT_LOG("Found a reaction having shorter deadline: " PRINTF_TIME, ptr->abs_d); + // change the pointers to insert the current thread + edf_sched_node_t* temp = ptr->left; + ptr->left = current; + current->right = ptr; + current->left = temp; + // if the insertion is not at the beginning of the list + if (temp) { + LF_PRINT_LOG("Insertion not at the beginning of the list"); + temp->right = current; + // if there is enough space to assign a priority value between ptr and temp + if (ptr->pri - temp->pri >= 2) { + // distancing the priority by 3 from the lowest if there is enough space + // (because it's likely that as time passes, newly coming deadlines will be bigger) + int incr = (ptr->pri - temp->pri >= EDF_PRIO_SPACING + 1) ? EDF_PRIO_SPACING : (ptr->pri - temp->pri - 1); + current->pri = current->left->pri + incr; + LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); + } else { + // shift elements to find a proper priority value + LF_PRINT_LOG("Shifting"); + if (!shift_edf_priorities(current)) { + lf_print_error_and_exit("More threads than priority values. Aborting."); + } + } + } else { // if the insertion is at the beginning of the list + LF_PRINT_LOG("Insertion at the beginning of the list"); + edf_ll_head = current; + + if (ptr->pri == EDF_MIN_PRIO) { + // need to shift right + if (!shift_edf_priorities(current)) { + lf_print_error_and_exit("More threads than priority values. Aborting."); + } + } else { + // distancing the priority by 3 from ptr (if there is enough space) + current->pri = (ptr->pri - EDF_PRIO_SPACING >= EDF_MIN_PRIO) ? (ptr->pri - EDF_PRIO_SPACING) : EDF_MIN_PRIO; + } + } + + break; + } else if (ptr->right == NULL) { + // this reaction has the earliest deadline in the list => + // it needs to be added as the tail of the LL + LF_PRINT_LOG("No reactions having shorter deadline, adding %s as the tail of the LL", + current_reaction_to_execute->name); + + // ptr is the current tail of the LL (cannot be null) + current->right = NULL; + current->left = ptr; + ptr->right = current; + + if (EDF_MAX_PRIO - ptr->pri >= 1) { + // distancing the priority by 3 from the lowest if there is enough space + // (because it's likely that as time passes, newly coming deadlines will be bigger) + // the maximum priority is LF_SCHED_MAX_PRIORITY - 1 (on Linux it's 98) + int incr = (EDF_MAX_PRIO - ptr->pri >= EDF_PRIO_SPACING + 1) ? EDF_PRIO_SPACING : (EDF_MAX_PRIO - ptr->pri); + current->pri = current->left->pri + incr; + LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name); + } else { + // shift elements to find a proper priority value + LF_PRINT_LOG("Shifting (only left)"); + if (!shift_edf_priorities(current)) { + lf_print_error_and_exit("More threads than priority values. Aborting."); + } + } + break; + } + ptr = ptr->right; + } + } + lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri); + LF_PRINT_LOG("About to exit the CS for reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); + lf_critical_section_exit(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", current_reaction_to_execute->name, lf_thread_get_priority(lf_thread_self())); + } +} + +/** + * @brief Remove the edf_sched_node_t element indexed by my_id from the LL and reset deadline and priority. + * @param my_id LF thread ID of the element to remove from the LL + */ +static void remove_from_edf_ll(lf_thread_t my_id) { + edf_sched_node_t* ptr = edf_ll_head; + // if the thread is already on the LL -- remove the old one + while (ptr) { + if (ptr->thread_id == my_id) { + if (ptr->left) { + ptr->left->right = ptr->right; + } + if (ptr->right) { + ptr->right->left = ptr->left; + } + + // my node was the head of the LL + if (ptr == edf_ll_head) { + edf_ll_head = ptr->right; + } + + ptr->abs_d = FOREVER; + ptr->pri = 1; + ptr->left = ptr->right = NULL; + + break; + } else { + ptr = ptr->right; + } + } +} + /** * @brief Assuming all other workers are idle, advance to the next level. * @param scheduler The scheduler. @@ -106,7 +482,9 @@ static void advance_level(lf_scheduler_t* scheduler) { lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level); #endif } + ///////////////////// Scheduler Init and Destroy API ///////////////////////// + /** * @brief Initialize the scheduler. * @@ -127,6 +505,15 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* // Already initialized return; } + + // Environment 0 (top level) is responsible for allocating the array that stores the + // information about worker thread priorities and deadlines. + environment_t* top_level_env; + int num_envs = _lf_get_environments(&top_level_env); + if (top_level_env == env) { + edf_elements = (edf_sched_node_t*)calloc(num_envs * top_level_env->num_workers, sizeof(edf_sched_node_t)); + } + lf_scheduler_t* scheduler = env->scheduler; scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t)); @@ -149,11 +536,49 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t* */ void lf_sched_free(lf_scheduler_t* scheduler) { pqueue_free((pqueue_t*)scheduler->custom_data->reaction_q); + // Environment 0 (top level) is responsible for freeing the array that stores the + // information about worker thread priorities and deadlines. + environment_t* top_level_env; + _lf_get_environments(&top_level_env); + if (top_level_env == scheduler->env) { + free(edf_elements); + } free(scheduler->custom_data); } ///////////////////// Scheduler Worker API (public) ///////////////////////// +void lf_sched_configure_worker() { +// Set default worker thread properties. +edf_elements[lf_thread_id()].abs_d = FOREVER; +edf_elements[lf_thread_id()].pri = 1; +edf_elements[lf_thread_id()].thread_id = lf_thread_self(); +edf_elements[lf_thread_id()].left = NULL; +edf_elements[lf_thread_id()].right = NULL; + +// Use the target property to set the policy. +lf_scheduling_policy_t policy = {.priority = EDF_SLEEP_PRIO, + .policy = LF_THREAD_POLICY}; +LF_PRINT_LOG("Setting thread policy to %d to thread %d", LF_THREAD_POLICY, lf_thread_id()); +int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy); +if (ret != 0) { + lf_print_warning("Couldn't set the scheduling policy. Try running the program with sudo rights."); +} + +#if LF_NUMBER_OF_CORES > 0 + // Pin the thread to cores starting from the highest numbered core using + // the assigned thread id: small thread id => high core number. Still, + // respecting the constraint on the specified number of cores the program can use. + int core_number = lf_available_cores() - 1 - (lf_thread_id() % LF_NUMBER_OF_CORES); + + ret = lf_thread_set_cpu(lf_thread_self(), core_number); + if (ret != 0) { + lf_print_error_and_exit("Couldn't bind thread-%u to core %d.", lf_thread_id(), core_number); + } + LF_PRINT_LOG("Thread %d using core_number %d", lf_thread_id(), core_number); +#endif // LF_NUMBER_OF_CORES > 0 +} + reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { // Need to lock the environment mutex. LF_PRINT_DEBUG("Scheduler: Worker %d locking environment mutex.", worker_number); @@ -186,6 +611,9 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed); } LF_MUTEX_UNLOCK(&scheduler->env->mutex); + if (LF_THREAD_POLICY > LF_SCHED_FAIR) { + assign_edf_priority(scheduler->env, reaction_to_return); + } return reaction_to_return; } else { // Found a reaction at a level other than the current level. @@ -231,6 +659,13 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) { lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued); } + LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", done_reaction->name, lf_thread_get_priority(lf_thread_self())); + lf_critical_section_enter(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("In the CS from reaction %s", done_reaction->name); + lf_thread_t my_id = lf_thread_self(); + remove_from_edf_ll(my_id); + lf_critical_section_exit(GLOBAL_ENVIRONMENT); + LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", done_reaction->name, lf_thread_get_priority(lf_thread_self())); } void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 7edd41a81..3388a1ab0 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -300,17 +300,9 @@ void lf_sched_free(lf_scheduler_t* scheduler) { } ///////////////////// Scheduler Worker API (public) ///////////////////////// -/** - * @brief Ask the scheduler for one more reaction. - * - * This function blocks until it can return a ready reaction for worker thread - * 'worker_number' or it is time for the worker thread to stop and exit (where a - * NULL value would be returned). - * - * @param worker_number - * @return reaction_t* A reaction for the worker to execute. NULL if the calling - * worker thread should exit. - */ + +void lf_sched_configure_worker() {} + reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { // Iterate until the stop tag is reached or reaction vectors are empty while (!scheduler->should_stop) { diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index d2db00658..7653c0eec 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -693,6 +693,8 @@ void lf_sched_free(lf_scheduler_t* scheduler) { ///////////////////////// Scheduler Worker API /////////////////////////////// +void lf_sched_configure_worker() {} + reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) { assert(worker_number >= 0); reaction_t* ret; diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 0d58f7431..7706a323b 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -78,6 +78,27 @@ void _lf_increment_tag_barrier_locked(environment_t* env, tag_t future_tag); */ void _lf_decrement_tag_barrier_locked(environment_t* env); +/** + * @brief The number of cores to use. + * + * If the target parameter number_of_cores is set, it will override this default. + */ +#ifndef LF_NUMBER_OF_CORES +#define LF_NUMBER_OF_CORES 0 +#endif + +/** + * @brief The thread scheduling policy to use. + * + * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY. + * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER. + * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds + * to SCHED_FIFO. + */ +#ifndef LF_THREAD_POLICY +#define LF_THREAD_POLICY LF_SCHED_FAIR +#endif + int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); bool wait_until(instant_t logical_time_ns, lf_cond_t* condition); diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h index f49f0bc54..ff5663aed 100644 --- a/include/core/threaded/scheduler.h +++ b/include/core/threaded/scheduler.h @@ -84,4 +84,9 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction */ void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number); +/** + * @brief Initialize priority and set core binding for the calling worker thread, if appropriate. + */ +void lf_sched_configure_worker(); + #endif // LF_SCHEDULER_H diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index 9611870cc..4d527dc26 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -119,18 +119,15 @@ int lf_available_cores(); lf_thread_t lf_thread_self(); /** - * Create a new thread, starting with execution of lf_thread - * getting passed arguments. The new handle is stored in thread_id. + * @brief Create a new thread and start execution of the function lf_thread + * with the specified arguments. + * + * The new handle is stored in thread_id. * * @return 0 on success, platform-specific error number otherwise. */ int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments); -/** - * @brief Helper function for creating a thread. - */ -int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments); - /** * Make calling thread wait for termination of the thread. The * exit status of the thread is stored in thread_return if thread_return @@ -172,23 +169,33 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number); * number indicates higher priority. Setting the priority of a thread only * makes sense if the thread is scheduled with LF_SCHED_TIMESLICE or LF_THREAD_PRIORITY * - * @param thread The thread. + * @param thread The thread ID. * @param priority The priority. * @return int 0 on success, platform-specific error otherwise */ int lf_thread_set_priority(lf_thread_t thread, int priority); /** - * @brief Set the scheduling policy of a thread. This is based on the scheduling + * DEBUGGING + */ +int lf_thread_get_priority(lf_thread_t thread); + +/** + * @brief Set the scheduling policy of a thread. + * + * This is based on the scheduling * concept from Linux explained here: https://man7.org/linux/man-pages/man7/sched.7.html * A scheduling policy is specific to a thread/worker. We have three policies * LF_SCHED_PRIORITY which corresponds to SCHED_FIFO on Linux. * LF_SCHED_TIMESLICE which corresponds to SCHED_RR on Linux. * LF_SCHED_FAIR which corresponds to SCHED_OTHER on Linux. * + * @param thread The thread ID. + * @param policy A pointer to the policy (this will not be used after returning, so it can be on the stack). * @return int 0 on success, platform-specific error number otherwise. */ int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy); +// FIXME: The policy pointer argument is worrisome. Can it really be on the stack? /** * Initialize a mutex. diff --git a/low_level_platform/impl/src/lf_POSIX_threads_support.c b/low_level_platform/impl/src/lf_POSIX_threads_support.c index 255f38255..ae5fc5322 100644 --- a/low_level_platform/impl/src/lf_POSIX_threads_support.c +++ b/low_level_platform/impl/src/lf_POSIX_threads_support.c @@ -34,6 +34,13 @@ int lf_mutex_init(lf_mutex_t* mutex) { // of the predicate.” This seems like a bug in the implementation of // pthreads. Maybe it has been fixed? pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + + // Initialize the mutex protocol to INHERIT: + // a thread t1 owning the mutex, when it is preempted by a + // higher-priority thread t2 that tries to get the lock on the + // same mutex, inherits t2's priority. + pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT); + return pthread_mutex_init((pthread_mutex_t*)mutex, &attr); } diff --git a/low_level_platform/impl/src/lf_linux_support.c b/low_level_platform/impl/src/lf_linux_support.c index a8caa3d47..e8e5f464b 100644 --- a/low_level_platform/impl/src/lf_linux_support.c +++ b/low_level_platform/impl/src/lf_linux_support.c @@ -54,6 +54,16 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return pthread_setaffinity_np(thread, sizeof(cpu_set), &cpu_set); } +int lf_thread_get_priority(lf_thread_t thread) { + struct sched_param schedparam; + int policy; + + // Get the current scheduling policy + pthread_getschedparam(thread, &policy, &schedparam); + + return schedparam.sched_priority; +} + int lf_thread_set_priority(lf_thread_t thread, int priority) { int posix_policy, min_pri, max_pri, final_priority, res; struct sched_param schedparam;