diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt
index 6d938ae0c..b8f91b309 100644
--- a/core/CMakeLists.txt
+++ b/core/CMakeLists.txt
@@ -168,7 +168,9 @@ define(FEDERATED_DECENTRALIZED)
define(FEDERATED)
define(FEDERATED_AUTHENTICATED)
define(FEDERATE_ID)
+define(LF_NUMBER_OF_CORES)
define(LF_REACTION_GRAPH_BREADTH)
+define(LF_THREAD_POLICY)
define(LF_TRACE)
define(LF_SINGLE_THREADED)
define(LOG_LEVEL)
diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c
index df0f271b1..7db41bf39 100644
--- a/core/threaded/reactor_threaded.c
+++ b/core/threaded/reactor_threaded.c
@@ -823,7 +823,7 @@ void _lf_worker_invoke_reaction(environment_t* env, int worker_number, reaction_
* @param env Environment within which we are executing.
* @param worker_number The number assigned to this worker thread
*/
-void _lf_worker_do_work(environment_t* env, int worker_number) {
+static void _lf_worker_do_work(environment_t* env, int worker_number) {
assert(env != GLOBAL_ENVIRONMENT);
// Keep track of whether we have decremented the idle thread count.
@@ -874,6 +874,8 @@ void _lf_worker_do_work(environment_t* env, int worker_number) {
*/
void* worker(void* arg) {
initialize_lf_thread_id();
+ lf_sched_configure_worker();
+
environment_t* env = (environment_t*)arg;
LF_MUTEX_LOCK(&env->mutex);
@@ -984,7 +986,6 @@ void determine_number_of_workers(void) {
* at compile time.
*/
int lf_reactor_c_main(int argc, const char* argv[]) {
- initialize_lf_thread_id();
// Invoke the function that optionally provides default command-line options.
lf_set_default_command_line_options();
@@ -1075,7 +1076,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
lf_print("Environment %u: ---- Intializing start tag", env->id);
_lf_initialize_start_tag(env);
- lf_print("Environment %u: ---- Spawning %d workers.", env->id, env->num_workers);
+ lf_print("Environment %u: ---- Spawning %d workers on %d cores.", env->id, env->num_workers, LF_NUMBER_OF_CORES);
for (int j = 0; j < env->num_workers; j++) {
if (i == 0 && j == 0) {
diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c
index 84afee379..cd0886c5b 100644
--- a/core/threaded/scheduler_GEDF_NP.c
+++ b/core/threaded/scheduler_GEDF_NP.c
@@ -3,6 +3,7 @@
* @author Soroush Bateni
* @author Edward A. Lee
* @author Marten Lohstroh
+ * @author Francesco Paladino
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: BSD 2-clause
* @brief Global Earliest Deadline First (GEDF) non-preemptive scheduler for the
@@ -24,7 +25,28 @@
#define NUMBER_OF_WORKERS 1
#endif // NUMBER_OF_WORKERS
+#ifndef EDF_INITIAL_PRIO
+#define EDF_INITIAL_PRIO 50
+#endif // EDF_INITIAL_PRIO
+
+#ifndef EDF_PRIO_SPACING
+#define EDF_PRIO_SPACING 3
+#endif // EDF_PRIO_SPACING
+
+#ifndef EDF_MIN_PRIO
+#define EDF_MIN_PRIO (LF_SCHED_MIN_PRIORITY + 2)
+#endif // EDF_MIN_PRIO
+
+#ifndef EDF_MAX_PRIO
+#define EDF_MAX_PRIO (LF_SCHED_MAX_PRIORITY - 2)
+#endif // EDF_MAX_PRIO
+
+#ifndef EDF_SLEEP_PRIO
+#define EDF_SLEEP_PRIO (LF_SCHED_MAX_PRIORITY - 1)
+#endif // EDF_SLEEP_PRIO
+
#include
+#include // For uint32_t
#include "low_level_platform.h"
#include "environment.h"
@@ -48,6 +70,18 @@ typedef struct custom_scheduler_data_t {
bool solo_holds_mutex; // Indicates sole thread holds the mutex.
} custom_scheduler_data_t;
+typedef struct edf_sched_node_t {
+ instant_t abs_d;
+ uint32_t pri;
+ struct edf_sched_node_t* left;
+ struct edf_sched_node_t* right;
+ lf_thread_t thread_id;
+} edf_sched_node_t;
+
+// Linked list of worker threads sorted by priority.
+static edf_sched_node_t* edf_elements = NULL;
+static edf_sched_node_t* edf_ll_head = NULL;
+
/////////////////// Scheduler Private API /////////////////////////
/**
@@ -57,6 +91,11 @@ typedef struct custom_scheduler_data_t {
*/
inline static void wait_for_reaction_queue_updates(lf_scheduler_t* scheduler, int worker_number) {
scheduler->number_of_idle_workers++;
+
+ // Set the priority to the maximum to be sure to be woken up when new reactions are available.
+ // FIXME: Should only do this if the number of threads is greater than the number of cores.
+ lf_thread_set_priority(lf_thread_self(), EDF_SLEEP_PRIO);
+
tracepoint_worker_wait_starts(scheduler->env, worker_number);
LF_COND_WAIT(&scheduler->custom_data->reaction_q_changed);
tracepoint_worker_wait_ends(scheduler->env, worker_number);
@@ -72,6 +111,14 @@ static int advance_tag(lf_scheduler_t* scheduler) {
// Set a flag in the scheduler that the lock is held by the sole executing thread.
// This prevents acquiring the mutex in lf_scheduler_trigger_reaction.
scheduler->custom_data->solo_holds_mutex = true;
+
+ // setting the priority to the maximum to be sure to be
+ // woken up as soon as the sleep time terminates (because there
+ // might be other worker threads from different enclaves having
+ // higher priority than what the current thread has)
+ // FIXME: Verify that there is no race wrt setting priority to MAX
+ lf_thread_set_priority(lf_thread_self(), LF_SCHED_MAX_PRIORITY - 1);
+
if (_lf_sched_advance_tag_locked(scheduler)) {
LF_PRINT_DEBUG("Scheduler: Reached stop tag.");
scheduler->should_stop = true;
@@ -90,6 +137,350 @@ static int advance_tag(lf_scheduler_t* scheduler) {
return 0;
}
+/**
+ * @brief Shift the priority of elements in the linked list (LL) to assign
+ * to current (freshly inserted in the LL) a priority value complying with the EDF rule.
+ *
+ * First, the algorithm tries to shift the nodes to the right of the position
+ * where current has been inserted. If this is not possible (the tail
+ * of the LL has priority 98 and there is no space to shift the previous
+ * elements), the function tries to shift the nodes to the left. If
+ * this again is not possible (the head of the LL has priority 2 and
+ * there is no space to shift the following elements), the function
+ * returns false (true otherwise).
+ *
+ * The calling thread must own the lock of the
+ * global mutex before calling this function.
+ *
+ * @param current: the node to which to assign a priority value after shifting
+ *
+ * @return true if the shift (either right or left) was possible and a priority
+ * value was assigned to current; false otherwise.
+ */
+static bool shift_edf_priorities(edf_sched_node_t* current) {
+ edf_sched_node_t* before = current->left;
+ edf_sched_node_t* after = current->right;
+ edf_sched_node_t* ptr = after;
+ // count the number of times the while loop executes, so you can distribute
+ // the priority space you find across the threads in the linked list
+ int counter = 1;
+ // set to true if we can find a shifting spot
+ bool shifted = false;
+ LF_PRINT_LOG("EDF: attempting to shift right");
+ while (ptr) {
+ counter++;
+ if (!ptr->right && ptr->pri != EDF_MAX_PRIO) {
+ // if until the tail we cannot find a shifting spot, and the tail is not 98
+ // this means the tail can be shifted to 98
+ LF_PRINT_LOG("EDF: got to the tail that is gonna be shifted to MAX");
+ shifted = true;
+
+ int diff = EDF_MAX_PRIO;
+ // in the case I haven't incremented ptr and immediately
+ // got to the tail (i.e., current == ptr->left), I need to use
+ // before-after as priority interval to compute the diff
+ if (ptr->left != current) {
+ diff -= ptr->left->pri;
+ } else {
+ diff -= ptr->left->left->pri;
+ }
+ // the formula is explained in the else-if case
+ int incr = (diff - 1) / (counter + 1) + 1;
+ LF_PRINT_LOG("EDF: shifting increment is %d: diff = %d, counter = %d", incr, diff, counter);
+
+ // change the tail's priority
+ LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, EDF_MAX_PRIO);
+ ptr->pri = EDF_MAX_PRIO;
+
+ lf_thread_set_priority(ptr->thread_id, ptr->pri);
+
+ // change the priorities of node from tail to the current thread
+ // to keep the relative priorities while shifting and system-calling
+ LF_PRINT_LOG("EDF: shifting the nodes before tail to make room for the new one");
+ do {
+ ptr = ptr->left;
+ LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->right->pri - incr);
+ ptr->pri = ptr->right->pri - incr;
+ lf_thread_set_priority(ptr->thread_id, ptr->pri);
+ } while (ptr != current);
+ LF_PRINT_LOG("EDF: finished shifting");
+ break;
+
+ } else if (ptr->right) {
+ int diff = ptr->right->pri - ptr->pri;
+ if (diff > 1) {
+ // eureka! we found shifting spot
+ LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id,
+ ptr->pri, ptr->right->thread_id, ptr->right->pri);
+ shifted = true;
+ // calculate the increment interval:
+ // # diff - 1 is the extra space to distribute equally
+ // (-1 because we guarantee a space of at least 1 between ptr and ptr->right);
+ // # counter + 1 is the number of spaces between "counter" elements in the LL;
+ // # +1 replaces the ceiling (not exactly but still ok);
+ int incr = (diff - 1) / (counter + 1) + 1;
+ // shift every node's priority from this spot to the spot
+ // we inserted the current thread, starting from the right
+ ptr = ptr->right;
+ do {
+ ptr = ptr->left;
+ LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->right->pri - incr);
+ ptr->pri = ptr->right->pri - incr;
+ lf_thread_set_priority(ptr->thread_id, ptr->pri);
+ } while (ptr != current);
+
+ // since we found a spot and shifted priorities, we don't need to continue with the while loop
+ break;
+ }
+ }
+ ptr = ptr->right;
+ }
+
+ if (!shifted) {
+ LF_PRINT_LOG("EDF: right-shift failed, attempting to shift left");
+ // if flag is still false, this means we couldn't find any shifting spot on the right side
+ // we need to check left (this is a copy of the above code with the needed edits... debeatable)
+ ptr = before;
+ counter = 1;
+ while (ptr) {
+ counter++;
+ if (!ptr->left && ptr->pri != EDF_MIN_PRIO) {
+ // if until the head we cannot find a shifting spot, and the head is not 2
+ // (1 is reserved for reactions w/o deadline) this means the head can be shifted
+ LF_PRINT_LOG("EDF: got to the head that is gonna be shifted to 2");
+ shifted = true;
+
+ int diff = -EDF_MIN_PRIO;
+ // in the case I haven't incremented ptr and immediately
+ // got to the head (i.e., current == ptr->right), I need to use
+ // before-after as priority interval to compute the diff
+ if (ptr->left != current) {
+ diff += ptr->right->pri;
+ } else {
+ diff += ptr->right->right->pri;
+ }
+
+ // usual formula to spread priorities
+ int incr = (diff - 1) / (counter + 1) + 1;
+ // change the head's priority
+ LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, 2);
+ ptr->pri = EDF_MIN_PRIO;
+ lf_thread_set_priority(ptr->thread_id, ptr->pri);
+
+ // change the priorities of node from head to the current thread
+
+ do {
+ ptr = ptr->right;
+ LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr);
+ ptr->pri = ptr->left->pri + incr;
+ lf_thread_set_priority(ptr->thread_id, ptr->pri);
+ } while (ptr != current);
+
+ break;
+
+ } else if (ptr->left) {
+ int diff = ptr->pri - ptr->left->pri;
+ if (diff > 1) {
+ // eureka! we found shifting spot
+ LF_PRINT_LOG("EDF: found a shifting spot between thread %ld (pri %d) and thread %ld (pri %d)", ptr->thread_id,
+ ptr->pri, ptr->left->thread_id, ptr->left->pri);
+ shifted = true;
+ // usual formula to spread priorities
+ int incr = (diff - 1) / (counter + 1) + 1;
+ // shift every node's priority from this spot to the spot
+ // we inserted the current thread, starting from the left
+
+ ptr = ptr->left;
+ do {
+ ptr = ptr->right;
+ LF_PRINT_LOG("EDF: shifting thread %ld from %d to %d", ptr->thread_id, ptr->pri, ptr->left->pri + incr);
+ ptr->pri = ptr->left->pri + incr;
+ lf_thread_set_priority(ptr->thread_id, ptr->pri);
+ } while (ptr != current);
+
+ // since we found a spot and shifted priorities, we don't need to continue with the while loop
+ break;
+ }
+ }
+ ptr = ptr->left;
+ }
+ }
+ return shifted;
+}
+
+/**
+ * @brief Assign a priority value between 1 and 98 to the current worker thread.
+ *
+ * The priority is determined by the current_reaction_to_execute using EDF.
+ * Priority 1 is assigned to worker threads executing reactions without a deadline,
+ * while priority 99 is reserved for worker threads waiting for work. The reason
+ * for the highest priority is to be sure that the awakening of these threads is not
+ * delayed by the other worker threads executing reactions (even in different enclaves).
+ *
+ * This function enters a critical section so the mutex lock should not be held on the
+ * environment when this is called.
+ *
+ * @param env The environment within which we are executing.
+ * @param current_reaction_to_execute The reaction the current worker thread is about to serve.
+ */
+static void assign_edf_priority(environment_t* env, reaction_t* current_reaction_to_execute) {
+ LF_PRINT_LOG("Assigning priority to reaction %s (thread %d, %ld)", current_reaction_to_execute->name, lf_thread_id(),
+ lf_thread_self());
+
+ // Examine the reaction's (inferred) deadline to set this thread's priority.
+ interval_t inferred_deadline = (interval_t)(current_reaction_to_execute->index >> 16);
+ // If there is no deadline, set the priority to 1.
+ if (inferred_deadline >= (FOREVER >> 16) << 16) {
+ LF_PRINT_LOG("Reaction %s has no deadline, setting its priority to 1", current_reaction_to_execute->name);
+ // All reactions without (inferred) deadlines result in a thread priority of 1.
+ lf_thread_set_priority(lf_thread_self(), 1);
+ // assuming the edf_element was not in the linked list anymore
+ // (removed on the completion of the previous served reaction)
+ } else {
+ // Get the absolute deadline to implement EDF.
+ instant_t absolute_deadline = (env->current_tag.time + inferred_deadline);
+ LF_PRINT_LOG("Reaction %s has inferred deadline " PRINTF_TIME, current_reaction_to_execute->name,
+ absolute_deadline);
+ // Need to know the priorities of running threads and the absolute deadline associated with it
+ // and choose a priority that is in between those with earlier deadlines and those with larger deadlines.
+ edf_sched_node_t* current = &edf_elements[lf_thread_id()];
+ current->abs_d = absolute_deadline;
+
+ LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", current_reaction_to_execute->name,
+ lf_thread_get_priority(lf_thread_self()));
+ lf_critical_section_enter(GLOBAL_ENVIRONMENT);
+ LF_PRINT_LOG("In the CS for reaction %s", current_reaction_to_execute->name);
+ // if there is no head -- then the current thread is the head
+ if (!edf_ll_head) {
+ edf_ll_head = current;
+ current->pri = EDF_INITIAL_PRIO;
+
+ LF_PRINT_LOG("No worker threads running, reaction %s is the head of the list and "
+ "gets priority %d",
+ current_reaction_to_execute->name, current->pri);
+ } else {
+ // there is a head in the LL
+
+ // assuming the edf_element was not in the linked list anymore
+ // (removed at the completion of the previously executed reaction)
+
+ edf_sched_node_t* ptr = edf_ll_head;
+ // find the spot that the current thread needs to insert itself
+ while (ptr) {
+ // the LL is from lowest priority to the highest priority
+ if (ptr->abs_d < absolute_deadline) {
+ LF_PRINT_LOG("Found a reaction having shorter deadline: " PRINTF_TIME, ptr->abs_d);
+ // change the pointers to insert the current thread
+ edf_sched_node_t* temp = ptr->left;
+ ptr->left = current;
+ current->right = ptr;
+ current->left = temp;
+ // if the insertion is not at the beginning of the list
+ if (temp) {
+ LF_PRINT_LOG("Insertion not at the beginning of the list");
+ temp->right = current;
+ // if there is enough space to assign a priority value between ptr and temp
+ if (ptr->pri - temp->pri >= 2) {
+ // distancing the priority by 3 from the lowest if there is enough space
+ // (because it's likely that as time passes, newly coming deadlines will be bigger)
+ int incr = (ptr->pri - temp->pri >= EDF_PRIO_SPACING + 1) ? EDF_PRIO_SPACING : (ptr->pri - temp->pri - 1);
+ current->pri = current->left->pri + incr;
+ LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name);
+ } else {
+ // shift elements to find a proper priority value
+ LF_PRINT_LOG("Shifting");
+ if (!shift_edf_priorities(current)) {
+ lf_print_error_and_exit("More threads than priority values. Aborting.");
+ }
+ }
+ } else { // if the insertion is at the beginning of the list
+ LF_PRINT_LOG("Insertion at the beginning of the list");
+ edf_ll_head = current;
+
+ if (ptr->pri == EDF_MIN_PRIO) {
+ // need to shift right
+ if (!shift_edf_priorities(current)) {
+ lf_print_error_and_exit("More threads than priority values. Aborting.");
+ }
+ } else {
+ // distancing the priority by 3 from ptr (if there is enough space)
+ current->pri =
+ (ptr->pri - EDF_PRIO_SPACING >= EDF_MIN_PRIO) ? (ptr->pri - EDF_PRIO_SPACING) : EDF_MIN_PRIO;
+ }
+ }
+
+ break;
+ } else if (ptr->right == NULL) {
+ // this reaction has the earliest deadline in the list =>
+ // it needs to be added as the tail of the LL
+ LF_PRINT_LOG("No reactions having shorter deadline, adding %s as the tail of the LL",
+ current_reaction_to_execute->name);
+
+ // ptr is the current tail of the LL (cannot be null)
+ current->right = NULL;
+ current->left = ptr;
+ ptr->right = current;
+
+ if (EDF_MAX_PRIO - ptr->pri >= 1) {
+ // distancing the priority by 3 from the lowest if there is enough space
+ // (because it's likely that as time passes, newly coming deadlines will be bigger)
+ // the maximum priority is LF_SCHED_MAX_PRIORITY - 1 (on Linux it's 98)
+ int incr = (EDF_MAX_PRIO - ptr->pri >= EDF_PRIO_SPACING + 1) ? EDF_PRIO_SPACING : (EDF_MAX_PRIO - ptr->pri);
+ current->pri = current->left->pri + incr;
+ LF_PRINT_LOG("Assigned priority %d to reaction %s", current->pri, current_reaction_to_execute->name);
+ } else {
+ // shift elements to find a proper priority value
+ LF_PRINT_LOG("Shifting (only left)");
+ if (!shift_edf_priorities(current)) {
+ lf_print_error_and_exit("More threads than priority values. Aborting.");
+ }
+ }
+ break;
+ }
+ ptr = ptr->right;
+ }
+ }
+ lf_thread_set_priority(lf_thread_self(), edf_elements[lf_thread_id()].pri);
+ LF_PRINT_LOG("About to exit the CS for reaction %s with priority %d", current_reaction_to_execute->name,
+ lf_thread_get_priority(lf_thread_self()));
+ lf_critical_section_exit(GLOBAL_ENVIRONMENT);
+ LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", current_reaction_to_execute->name,
+ lf_thread_get_priority(lf_thread_self()));
+ }
+}
+
+/**
+ * @brief Remove the edf_sched_node_t element indexed by my_id from the LL and reset deadline and priority.
+ * @param my_id LF thread ID of the element to remove from the LL
+ */
+static void remove_from_edf_ll(lf_thread_t my_id) {
+ edf_sched_node_t* ptr = edf_ll_head;
+ // if the thread is already on the LL -- remove the old one
+ while (ptr) {
+ if (ptr->thread_id == my_id) {
+ if (ptr->left) {
+ ptr->left->right = ptr->right;
+ }
+ if (ptr->right) {
+ ptr->right->left = ptr->left;
+ }
+
+ // my node was the head of the LL
+ if (ptr == edf_ll_head) {
+ edf_ll_head = ptr->right;
+ }
+
+ ptr->abs_d = FOREVER;
+ ptr->pri = 1;
+ ptr->left = ptr->right = NULL;
+
+ break;
+ } else {
+ ptr = ptr->right;
+ }
+ }
+}
+
/**
* @brief Assuming all other workers are idle, advance to the next level.
* @param scheduler The scheduler.
@@ -106,7 +497,9 @@ static void advance_level(lf_scheduler_t* scheduler) {
lf_stall_advance_level_federation_locked(scheduler->custom_data->current_level);
#endif
}
+
///////////////////// Scheduler Init and Destroy API /////////////////////////
+
/**
* @brief Initialize the scheduler.
*
@@ -127,6 +520,15 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
// Already initialized
return;
}
+
+ // Environment 0 (top level) is responsible for allocating the array that stores the
+ // information about worker thread priorities and deadlines.
+ environment_t* top_level_env;
+ int num_envs = _lf_get_environments(&top_level_env);
+ if (top_level_env == env) {
+ edf_elements = (edf_sched_node_t*)calloc(num_envs * top_level_env->num_workers, sizeof(edf_sched_node_t));
+ }
+
lf_scheduler_t* scheduler = env->scheduler;
scheduler->custom_data = (custom_scheduler_data_t*)calloc(1, sizeof(custom_scheduler_data_t));
@@ -149,11 +551,48 @@ void lf_sched_init(environment_t* env, size_t number_of_workers, sched_params_t*
*/
void lf_sched_free(lf_scheduler_t* scheduler) {
pqueue_free((pqueue_t*)scheduler->custom_data->reaction_q);
+ // Environment 0 (top level) is responsible for freeing the array that stores the
+ // information about worker thread priorities and deadlines.
+ environment_t* top_level_env;
+ _lf_get_environments(&top_level_env);
+ if (top_level_env == scheduler->env) {
+ free(edf_elements);
+ }
free(scheduler->custom_data);
}
///////////////////// Scheduler Worker API (public) /////////////////////////
+void lf_sched_configure_worker() {
+ // Set default worker thread properties.
+ edf_elements[lf_thread_id()].abs_d = FOREVER;
+ edf_elements[lf_thread_id()].pri = 1;
+ edf_elements[lf_thread_id()].thread_id = lf_thread_self();
+ edf_elements[lf_thread_id()].left = NULL;
+ edf_elements[lf_thread_id()].right = NULL;
+
+ // Use the target property to set the policy.
+ lf_scheduling_policy_t policy = {.priority = EDF_SLEEP_PRIO, .policy = LF_THREAD_POLICY};
+ LF_PRINT_LOG("Setting thread policy to %d to thread %d", LF_THREAD_POLICY, lf_thread_id());
+ int ret = lf_thread_set_scheduling_policy(lf_thread_self(), &policy);
+ if (ret != 0) {
+ lf_print_warning("Couldn't set the scheduling policy. Try running the program with sudo rights.");
+ }
+
+#if LF_NUMBER_OF_CORES > 0
+ // Pin the thread to cores starting from the highest numbered core using
+ // the assigned thread id: small thread id => high core number. Still,
+ // respecting the constraint on the specified number of cores the program can use.
+ int core_number = lf_available_cores() - 1 - (lf_thread_id() % LF_NUMBER_OF_CORES);
+
+ ret = lf_thread_set_cpu(lf_thread_self(), core_number);
+ if (ret != 0) {
+ lf_print_error_and_exit("Couldn't bind thread-%u to core %d.", lf_thread_id(), core_number);
+ }
+ LF_PRINT_LOG("Thread %d using core_number %d", lf_thread_id(), core_number);
+#endif // LF_NUMBER_OF_CORES > 0
+}
+
reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) {
// Need to lock the environment mutex.
LF_PRINT_DEBUG("Scheduler: Worker %d locking environment mutex.", worker_number);
@@ -186,6 +625,9 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu
LF_COND_SIGNAL(&scheduler->custom_data->reaction_q_changed);
}
LF_MUTEX_UNLOCK(&scheduler->env->mutex);
+ if (LF_THREAD_POLICY > LF_SCHED_FAIR) {
+ assign_edf_priority(scheduler->env, reaction_to_return);
+ }
return reaction_to_return;
} else {
// Found a reaction at a level other than the current level.
@@ -231,6 +673,15 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction
if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) {
lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued);
}
+ LF_PRINT_LOG("Attempt to enter the CS from reaction %s with priority %d", done_reaction->name,
+ lf_thread_get_priority(lf_thread_self()));
+ lf_critical_section_enter(GLOBAL_ENVIRONMENT);
+ LF_PRINT_LOG("In the CS from reaction %s", done_reaction->name);
+ lf_thread_t my_id = lf_thread_self();
+ remove_from_edf_ll(my_id);
+ lf_critical_section_exit(GLOBAL_ENVIRONMENT);
+ LF_PRINT_LOG("Out of the CS for reaction %s with priority %d", done_reaction->name,
+ lf_thread_get_priority(lf_thread_self()));
}
void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c
index fd0ccfb04..3964e5eda 100644
--- a/core/threaded/scheduler_NP.c
+++ b/core/threaded/scheduler_NP.c
@@ -300,17 +300,9 @@ void lf_sched_free(lf_scheduler_t* scheduler) {
}
///////////////////// Scheduler Worker API (public) /////////////////////////
-/**
- * @brief Ask the scheduler for one more reaction.
- *
- * This function blocks until it can return a ready reaction for worker thread
- * 'worker_number' or it is time for the worker thread to stop and exit (where a
- * NULL value would be returned).
- *
- * @param worker_number
- * @return reaction_t* A reaction for the worker to execute. NULL if the calling
- * worker thread should exit.
- */
+
+void lf_sched_configure_worker() {}
+
reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) {
// Iterate until the stop tag is reached or reaction vectors are empty
while (!scheduler->should_stop) {
diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c
index 5a926aba6..3d0e8859f 100644
--- a/core/threaded/scheduler_adaptive.c
+++ b/core/threaded/scheduler_adaptive.c
@@ -698,6 +698,8 @@ void lf_sched_free(lf_scheduler_t* scheduler) {
///////////////////////// Scheduler Worker API ///////////////////////////////
+void lf_sched_configure_worker() {}
+
reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_number) {
assert(worker_number >= 0);
reaction_t* ret;
diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h
index 2f5463165..271df23c2 100644
--- a/include/core/threaded/reactor_threaded.h
+++ b/include/core/threaded/reactor_threaded.h
@@ -12,6 +12,27 @@
#include "lf_types.h"
+/**
+ * @brief The number of cores to use.
+ *
+ * If the target parameter number_of_cores is set, it will override this default.
+ */
+#ifndef LF_NUMBER_OF_CORES
+#define LF_NUMBER_OF_CORES 0
+#endif
+
+/**
+ * @brief The thread scheduling policy to use.
+ *
+ * This should be one of LF_SCHED_FAIR, LF_SCHED_TIMESLICE, or LF_SCHED_PRIORITY.
+ * The default is LF_SCHED_FAIR, which corresponds to the Linux SCHED_OTHER.
+ * LF_SCHED_TIMESLICE corresponds to Linux SCHED_RR, and LF_SCHED_PRIORITY corresponds
+ * to SCHED_FIFO.
+ */
+#ifndef LF_THREAD_POLICY
+#define LF_THREAD_POLICY LF_SCHED_FAIR
+#endif
+
/**
* Enqueue port absent reactions that will send a PORT_ABSENT
* message to downstream federates if a given network output port is not present.
diff --git a/include/core/threaded/scheduler.h b/include/core/threaded/scheduler.h
index f49f0bc54..ff5663aed 100644
--- a/include/core/threaded/scheduler.h
+++ b/include/core/threaded/scheduler.h
@@ -84,4 +84,9 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction
*/
void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number);
+/**
+ * @brief Initialize priority and set core binding for the calling worker thread, if appropriate.
+ */
+void lf_sched_configure_worker();
+
#endif // LF_SCHEDULER_H
diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h
index afffd2a9e..ea69900d3 100644
--- a/low_level_platform/api/low_level_platform.h
+++ b/low_level_platform/api/low_level_platform.h
@@ -121,18 +121,15 @@ int lf_available_cores();
lf_thread_t lf_thread_self();
/**
- * Create a new thread, starting with execution of lf_thread
- * getting passed arguments. The new handle is stored in thread_id.
+ * @brief Create a new thread and start execution of the function lf_thread
+ * with the specified arguments.
+ *
+ * The new handle is stored in thread_id.
*
* @return 0 on success, platform-specific error number otherwise.
*/
int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments);
-/**
- * @brief Helper function for creating a thread.
- */
-int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments);
-
/**
* Make calling thread wait for termination of the thread. The
* exit status of the thread is stored in thread_return if thread_return
@@ -174,23 +171,33 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number);
* number indicates higher priority. Setting the priority of a thread only
* makes sense if the thread is scheduled with LF_SCHED_TIMESLICE or LF_THREAD_PRIORITY
*
- * @param thread The thread.
+ * @param thread The thread ID.
* @param priority The priority.
* @return int 0 on success, platform-specific error otherwise
*/
int lf_thread_set_priority(lf_thread_t thread, int priority);
/**
- * @brief Set the scheduling policy of a thread. This is based on the scheduling
+ * DEBUGGING
+ */
+int lf_thread_get_priority(lf_thread_t thread);
+
+/**
+ * @brief Set the scheduling policy of a thread.
+ *
+ * This is based on the scheduling
* concept from Linux explained here: https://man7.org/linux/man-pages/man7/sched.7.html
* A scheduling policy is specific to a thread/worker. We have three policies
* LF_SCHED_PRIORITY which corresponds to SCHED_FIFO on Linux.
* LF_SCHED_TIMESLICE which corresponds to SCHED_RR on Linux.
* LF_SCHED_FAIR which corresponds to SCHED_OTHER on Linux.
*
+ * @param thread The thread ID.
+ * @param policy A pointer to the policy (this will not be used after returning, so it can be on the stack).
* @return int 0 on success, platform-specific error number otherwise.
*/
int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy);
+// FIXME: The policy pointer argument is worrisome. Can it really be on the stack?
/**
* Initialize a mutex.
diff --git a/low_level_platform/impl/src/lf_POSIX_threads_support.c b/low_level_platform/impl/src/lf_POSIX_threads_support.c
index 255f38255..ae5fc5322 100644
--- a/low_level_platform/impl/src/lf_POSIX_threads_support.c
+++ b/low_level_platform/impl/src/lf_POSIX_threads_support.c
@@ -34,6 +34,13 @@ int lf_mutex_init(lf_mutex_t* mutex) {
// of the predicate.” This seems like a bug in the implementation of
// pthreads. Maybe it has been fixed?
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
+
+ // Initialize the mutex protocol to INHERIT:
+ // a thread t1 owning the mutex, when it is preempted by a
+ // higher-priority thread t2 that tries to get the lock on the
+ // same mutex, inherits t2's priority.
+ pthread_mutexattr_setprotocol(&attr, PTHREAD_PRIO_INHERIT);
+
return pthread_mutex_init((pthread_mutex_t*)mutex, &attr);
}
diff --git a/low_level_platform/impl/src/lf_flexpret_support.c b/low_level_platform/impl/src/lf_flexpret_support.c
index 7fb6d2a48..0acd37a41 100644
--- a/low_level_platform/impl/src/lf_flexpret_support.c
+++ b/low_level_platform/impl/src/lf_flexpret_support.c
@@ -226,6 +226,17 @@ void initialize_lf_thread_id() {
// Nothing needed here; thread ID's are already available in harware registers
// which can be fetched with `read_hartid`.
}
+
+/**
+ * Real-time scheduling API not implemented for FlexPRET.
+ */
+int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; }
+
+int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; }
+
+int lf_thread_get_priority(lf_thread_t thread) { return -1; }
+
+int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; }
#endif
#endif // PLATFORM_FLEXPRET
diff --git a/low_level_platform/impl/src/lf_linux_support.c b/low_level_platform/impl/src/lf_linux_support.c
index a8caa3d47..93b97cd97 100644
--- a/low_level_platform/impl/src/lf_linux_support.c
+++ b/low_level_platform/impl/src/lf_linux_support.c
@@ -54,6 +54,18 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) {
return pthread_setaffinity_np(thread, sizeof(cpu_set), &cpu_set);
}
+// FIXME: This does not do any translation between LF_SCHED priority range and
+// the Linux sched priority range.
+int lf_thread_get_priority(lf_thread_t thread) {
+ struct sched_param schedparam;
+ int policy;
+
+ // Get the current scheduling policy
+ pthread_getschedparam(thread, &policy, &schedparam);
+
+ return schedparam.sched_priority;
+}
+
int lf_thread_set_priority(lf_thread_t thread, int priority) {
int posix_policy, min_pri, max_pri, final_priority, res;
struct sched_param schedparam;
diff --git a/low_level_platform/impl/src/lf_macos_support.c b/low_level_platform/impl/src/lf_macos_support.c
index bf96362cb..ddc384ee0 100644
--- a/low_level_platform/impl/src/lf_macos_support.c
+++ b/low_level_platform/impl/src/lf_macos_support.c
@@ -46,6 +46,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; }
int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; }
+int lf_thread_get_priority(lf_thread_t thread) { return -1; }
+
int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; }
#endif
diff --git a/low_level_platform/impl/src/lf_windows_support.c b/low_level_platform/impl/src/lf_windows_support.c
index 61424ac7f..083508cf5 100644
--- a/low_level_platform/impl/src/lf_windows_support.c
+++ b/low_level_platform/impl/src/lf_windows_support.c
@@ -196,6 +196,8 @@ int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; }
int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; }
+int lf_thread_get_priority(lf_thread_t thread) { return -1; }
+
int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; }
int lf_mutex_init(_lf_critical_section_t* critical_section) {
diff --git a/low_level_platform/impl/src/lf_zephyr_support.c b/low_level_platform/impl/src/lf_zephyr_support.c
index 74ae9bf90..8370b7a94 100644
--- a/low_level_platform/impl/src/lf_zephyr_support.c
+++ b/low_level_platform/impl/src/lf_zephyr_support.c
@@ -190,6 +190,9 @@ int lf_thread_set_priority(lf_thread_t thread, int priority) {
return 0;
}
+// FIXME: Implement this.
+int lf_thread_get_priority(lf_thread_t thread) { return -1; }
+
int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) {
// Update the policy
switch (policy->policy) {