Skip to content

Commit

Permalink
Merge branch 'main' into rti-DNET
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee authored Oct 13, 2024
2 parents 2bfd833 + 184567d commit da80702
Show file tree
Hide file tree
Showing 26 changed files with 250 additions and 134 deletions.
20 changes: 16 additions & 4 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,16 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
const char* trace_file_name) {
(void)trace_file_name; // Will be used with future enclave support.

env->name = malloc(strlen(name) + 1); // +1 for the null terminator
LF_ASSERT_NON_NULL(env->name);
strcpy(env->name, name);

// Space for the name string with the null terminator.
if (name != NULL) {
size_t name_size = strlen(name) + 1; // +1 for the null terminator
env->name = (char*)malloc(name_size);
LF_ASSERT_NON_NULL(env->name);
// Use strncpy rather than strcpy to avoid compiler warnings.
strncpy(env->name, name, name_size);
} else {
env->name = NULL;
}
env->id = id;
env->stop_tag = FOREVER_TAG;

Expand Down Expand Up @@ -284,3 +290,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->initialized = true;
return 0;
}

void environment_verify(environment_t* env) {
for (int i = 0; i < env->is_present_fields_size; i++) {
LF_ASSERT_NON_NULL(env->is_present_fields[i]);
}
}
4 changes: 2 additions & 2 deletions core/federated/RTI/rti.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARG BASEIMAGE=alpine:latest
FROM ${BASEIMAGE} as builder
FROM ${BASEIMAGE} AS builder
COPY . /lingua-franca
WORKDIR /lingua-franca/core/federated/RTI
RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
Expand All @@ -12,7 +12,7 @@ RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
WORKDIR /lingua-franca

# application stage
FROM ${BASEIMAGE} as app
FROM ${BASEIMAGE} AS app
LABEL maintainer="lf-lang"
LABEL source="https://github.com/lf-lang/reactor-c/tree/main/core/federated/RTI"
COPY --from=builder /usr/local/bin/RTI /usr/local/bin/RTI
Expand Down
20 changes: 17 additions & 3 deletions core/lf_token.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,20 @@ int _lf_count_token_allocations;
#include "platform.h" // Enter/exit critical sections
#include "port.h" // Defines lf_port_base_t.

lf_token_t* _lf_tokens_allocated_in_reactions = NULL;
/**
* @brief List of tokens created within reactions that must be freed.
*
* Tokens created by lf_writable_copy, which is automatically invoked
* when an input is mutable, must have their reference count decremented
* at the end of a tag (or the beginning of the next tag).
* Otherwise, their memory could leak. If they are passed on to
* an output or to a call to lf_schedule during the reaction, then
* those will also result in incremented reference counts, enabling
* the token to live on until used. For example, a new token created
* by lf_writable_copy could become the new template token for an output
* via a call to lf_set.
*/
static lf_token_t* _lf_tokens_allocated_in_reactions = NULL;

////////////////////////////////////////////////////////////////////
//// Global variables not visible outside this file.
Expand Down Expand Up @@ -197,6 +210,8 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) {
if (hashset_iterator_next(iterator) >= 0) {
result = hashset_iterator_value(iterator);
hashset_remove(_lf_token_recycling_bin, result);
// Make sure there isn't a previous value.
result->value = NULL;
LF_PRINT_DEBUG("_lf_new_token: Retrieved token from the recycling bin: %p", (void*)result);
}
free(iterator);
Expand Down Expand Up @@ -352,8 +367,7 @@ token_freed _lf_done_using(lf_token_t* token) {

void _lf_free_token_copies() {
while (_lf_tokens_allocated_in_reactions != NULL) {
lf_token_t* next = _lf_tokens_allocated_in_reactions->next;
_lf_done_using(_lf_tokens_allocated_in_reactions);
_lf_tokens_allocated_in_reactions = next;
_lf_tokens_allocated_in_reactions = _lf_tokens_allocated_in_reactions->next;
}
}
40 changes: 29 additions & 11 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,19 @@ void _lf_pop_events(environment_t* env) {
}
}

// Mark the trigger present.
// Mark the trigger present
event->trigger->status = present;

// If the trigger is a periodic timer, create a new event for its next execution.
if (event->trigger->is_timer && event->trigger->period > 0LL) {
// Reschedule the trigger.
lf_schedule_trigger(env, event->trigger, event->trigger->period, NULL);
} else {
// For actions, store a pointer to status field so it is reset later.
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
env->is_present_fields_abbreviated[ipfas] = (bool*)&event->trigger->status;
}
}

// Copy the token pointer into the trigger struct so that the
Expand All @@ -323,9 +329,6 @@ void _lf_pop_events(environment_t* env) {
// freed prematurely.
_lf_done_using(token);

// Mark the trigger present.
event->trigger->status = present;

lf_recycle_event(env, event);

// Peek at the next event in the event queue.
Expand Down Expand Up @@ -383,12 +386,16 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) {

// Get an event_t struct to put on the event queue.
// Recycle event_t structs, if possible.
event_t* e = lf_get_new_event(env);
e->trigger = timer;
e->base.tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0};
// NOTE: No lock is being held. Assuming this only happens at startup.
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e);
tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called.
tag_t next_tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0};
// Do not schedule the next event if it is after the timeout.
if (!lf_is_tag_after_stop_tag(env, next_tag)) {
event_t* e = lf_get_new_event(env);
e->trigger = timer;
e->base.tag = next_tag;
// NOTE: No lock is being held. Assuming this only happens at startup.
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e);
tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called.
}
}

void _lf_initialize_timers(environment_t* env) {
Expand Down Expand Up @@ -603,8 +610,12 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t*
// for which we decrement the reference count.
_lf_replace_template_token((token_template_t*)trigger, token);

// Mark the trigger present.
// Mark the trigger present and store a pointer to it for marking it as absent later.
trigger->status = present;
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
env->is_present_fields_abbreviated[ipfas] = (bool*)&trigger->status;
}

// Push the corresponding reactions for this trigger
// onto the reaction queue.
Expand Down Expand Up @@ -1096,6 +1107,13 @@ void initialize_global(void) {
// Call the code-generated function to initialize all actions, timers, and ports
// This is done for all environments/enclaves at the same time.
_lf_initialize_trigger_objects();

#if !defined(LF_SINGLE_THREADED) && !defined(NDEBUG)
// If we are testing, verify that environment with pointers is correctly set up.
for (int i = 0; i < num_envs; i++) {
environment_verify(&envs[i]);
}
#endif
}

/**
Expand Down
16 changes: 11 additions & 5 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ void lf_set_present(lf_port_base_t* port) {
return;
environment_t* env = port->source_reactor->environment;
bool* is_present_field = &port->is_present;
int ipfas = lf_atomic_fetch_add32(&env->is_present_fields_abbreviated_size, 1);
int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1);
if (ipfas < env->is_present_fields_size) {
env->is_present_fields_abbreviated[ipfas] = is_present_field;
}
*is_present_field = true;

// Support for sparse destination multiports.
if (port->sparse_record && port->destination_channel >= 0 && port->sparse_record->size >= 0) {
size_t next = (size_t)lf_atomic_fetch_add32(&port->sparse_record->size, 1);
size_t next = (size_t)lf_atomic_fetch_add(&port->sparse_record->size, 1);
if (next >= port->sparse_record->capacity) {
// Buffer is full. Have to revert to the classic iteration.
port->sparse_record->size = -1;
Expand Down Expand Up @@ -1023,13 +1023,17 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
#endif

LF_PRINT_DEBUG("Start time: " PRINTF_TIME "ns", start_time);
struct timespec physical_time_timespec = {start_time / BILLION, start_time % BILLION};

#ifdef MINIMAL_STDLIB
lf_print("---- Start execution ----");
#else
lf_print("---- Start execution at time %s---- plus %ld nanoseconds", ctime(&physical_time_timespec.tv_sec),
physical_time_timespec.tv_nsec);
struct timespec physical_time_timespec = {start_time / BILLION, start_time % BILLION};
struct tm* time_info = localtime(&physical_time_timespec.tv_sec);
char buffer[80]; // Long enough to hold the formatted time string.
// Use strftime rather than ctime because as of C23, ctime is deprecated.
strftime(buffer, sizeof(buffer), "%a %b %d %H:%M:%S %Y", time_info);

lf_print("---- Start execution on %s ---- plus %ld nanoseconds", buffer, physical_time_timespec.tv_nsec);
#endif // MINIMAL_STDLIB

// Create and initialize the environments for each enclave
Expand Down Expand Up @@ -1114,6 +1118,8 @@ int lf_reactor_c_main(int argc, const char* argv[]) {
} else {
int failure = lf_thread_join(env->thread_ids[j], &worker_thread_exit_status);
if (failure) {
// Windows warns that strerror is deprecated but doesn't define strerror_r.
// There seems to be no portable replacement.
lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure));
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/threaded/scheduler_GEDF_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu

void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) {
(void)worker_number; // Suppress unused parameter warning.
if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) {
if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) {
lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued);
}
}

void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
(void)worker_number; // Suppress unused parameter warning.
if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) {
if (reaction == NULL || !lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) {
return;
}
LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index));
Expand Down
10 changes: 5 additions & 5 deletions core/threaded/scheduler_NP.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction
scheduler->indexes[reaction_level] = 0;
}
#endif
int reaction_q_level_index = lf_atomic_fetch_add32((int32_t*)&scheduler->indexes[reaction_level], 1);
int reaction_q_level_index = lf_atomic_fetch_add((int*)&scheduler->indexes[reaction_level], 1);
assert(reaction_q_level_index >= 0);
LF_PRINT_DEBUG("Scheduler: Accessing triggered reactions at the level %zu with index %d.", reaction_level,
reaction_q_level_index);
Expand Down Expand Up @@ -203,7 +203,7 @@ static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* schedul
static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) {
// Increment the number of idle workers by 1 and check if this is the last
// worker thread to become idle.
if (lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) {
if (lf_atomic_add_fetch((int*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) {
// Last thread to go idle
LF_PRINT_DEBUG("Scheduler: Worker %zu is the last idle thread.", worker_number);
// Call on the scheduler to distribute work or advance tag.
Expand Down Expand Up @@ -322,7 +322,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu
// the current level (if there is a causality loop)
LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[current_level]);
#endif
int current_level_q_index = lf_atomic_add_fetch32((int32_t*)&scheduler->indexes[current_level], -1);
int current_level_q_index = lf_atomic_add_fetch((int*)&scheduler->indexes[current_level], -1);
if (current_level_q_index >= 0) {
LF_PRINT_DEBUG("Scheduler: Worker %d popping reaction with level %zu, index "
"for level: %d.",
Expand Down Expand Up @@ -361,7 +361,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu
*/
void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) {
(void)worker_number;
if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) {
if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) {
lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued);
}
}
Expand All @@ -388,7 +388,7 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction
void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
(void)worker_number;

if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) {
if (reaction == NULL || !lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) {
return;
}
LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index));
Expand Down
14 changes: 7 additions & 7 deletions core/threaded/scheduler_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ static void worker_assignments_free(lf_scheduler_t* scheduler) {
static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
#ifndef FEDERATED
int index = lf_atomic_add_fetch32((int32_t*)(worker_assignments->num_reactions_by_worker + worker), -1);
int index = lf_atomic_add_fetch(worker_assignments->num_reactions_by_worker + worker, -1);
if (index >= 0) {
return worker_assignments->reactions_by_worker[worker][index];
}
Expand All @@ -223,9 +223,9 @@ static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) {
old_num_reactions = current_num_reactions;
if (old_num_reactions <= 0)
return NULL;
} while ((current_num_reactions = lf_atomic_val_compare_and_swap32(
(int32_t*)(worker_assignments->num_reactions_by_worker + worker), old_num_reactions,
(index = old_num_reactions - 1))) != old_num_reactions);
} while ((current_num_reactions =
lf_atomic_val_compare_and_swap(worker_assignments->num_reactions_by_worker + worker, old_num_reactions,
(index = old_num_reactions - 1))) != old_num_reactions);
return worker_assignments->reactions_by_worker[worker][index];
#endif
}
Expand Down Expand Up @@ -282,7 +282,7 @@ static void worker_assignments_put(lf_scheduler_t* scheduler, reaction_t* reacti
hash = hash ^ (hash >> 31);
size_t worker = hash % worker_assignments->num_workers_by_level[level];
size_t num_preceding_reactions =
lf_atomic_fetch_add32((int32_t*)&worker_assignments->num_reactions_by_worker_by_level[level][worker], 1);
lf_atomic_fetch_add(&worker_assignments->num_reactions_by_worker_by_level[level][worker], 1);
worker_assignments->reactions_by_worker_by_level[level][worker][num_preceding_reactions] = reaction;
}

Expand Down Expand Up @@ -383,7 +383,7 @@ static bool worker_states_finished_with_level_locked(lf_scheduler_t* scheduler,
assert(((int64_t)worker_assignments->num_reactions_by_worker[worker]) <= 0);
// Why use an atomic operation when we are supposed to be "as good as locked"? Because I took a
// shortcut, and the shortcut was imperfect.
size_t ret = lf_atomic_add_fetch32((int32_t*)&worker_states->num_loose_threads, -1);
size_t ret = lf_atomic_add_fetch(&worker_states->num_loose_threads, -1);
assert(ret <= worker_assignments->max_num_workers); // Check for underflow
return !ret;
}
Expand Down Expand Up @@ -726,7 +726,7 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction

void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) {
LF_ASSERT(worker_number >= -1, "Sched: Invalid worker number");
if (!lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued))
if (!lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued))
return;
worker_assignments_put(scheduler, reaction);
}
Expand Down
4 changes: 2 additions & 2 deletions core/utils/lf_semaphore.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* @param count The count to start with.
* @return lf_semaphore_t* Can be NULL on error.
*/
lf_semaphore_t* lf_semaphore_new(int count) {
lf_semaphore_t* lf_semaphore_new(size_t count) {
lf_semaphore_t* semaphore = (lf_semaphore_t*)malloc(sizeof(lf_semaphore_t));
LF_MUTEX_INIT(&semaphore->mutex);
LF_COND_INIT(&semaphore->cond, &semaphore->mutex);
Expand All @@ -55,7 +55,7 @@ lf_semaphore_t* lf_semaphore_new(int count) {
* @param semaphore Instance of a semaphore
* @param i The count to add.
*/
void lf_semaphore_release(lf_semaphore_t* semaphore, int i) {
void lf_semaphore_release(lf_semaphore_t* semaphore, size_t i) {
assert(semaphore != NULL);
LF_MUTEX_LOCK(&semaphore->mutex);
semaphore->count += i;
Expand Down
2 changes: 2 additions & 0 deletions core/utils/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ void lf_print_error_system_failure(const char* format, ...) {
va_start(args, format);
lf_vprint_error(format, args);
va_end(args);
// Windows warns that strerror is deprecated but doesn't define strerror_r.
// There seems to be no portable replacement.
lf_print_error_and_exit("Error %d: %s", errno, strerror(errno));
exit(EXIT_FAILURE);
}
Expand Down
7 changes: 7 additions & 0 deletions include/core/environment.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs,
const char* trace_file_name);

/**
* @brief Verify that the environment is correctly set up.
*
* @param env
*/
void environment_verify(environment_t* env);

/**
* @brief Free the dynamically allocated memory on the environment struct.
* @param env The environment in which we are executing.
Expand Down
Loading

0 comments on commit da80702

Please sign in to comment.