Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Apply pluggable security for federated execution. #515

Draft
wants to merge 50 commits into
base: networkdriver
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
fd7bd22
Perform busy wait when the wait duration is less than MIN_SLEEP_DURATION
byeonggiljun Jan 28, 2025
184b55f
Do not check MIN_SLEEP_DURATION when waiting for the physical time to…
byeonggiljun Jan 29, 2025
f9081b1
Revert changes
byeonggiljun Jan 29, 2025
98b1090
Make immediately return when we have already passed the target physi…
byeonggiljun Jan 30, 2025
09fd077
Add sst_support.
Jakio815 Jan 31, 2025
2759ae1
Merge branch 'main' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 31, 2025
7f0ad69
Merge branch 'shutdown' of github.com:lf-lang/reactor-c into shutdown
Jakio815 Jan 31, 2025
3fb0bf9
Add sst_priv_t struct.
Jakio815 Jan 31, 2025
27ac24e
Add initialize_netdrv for sst.
Jakio815 Jan 31, 2025
a814a07
Add free_netdrv for sst
Jakio815 Jan 31, 2025
82e63bd
Add create_server, with also passing path as global var in lf_sst_sup…
Jakio815 Jan 31, 2025
4a5f1b6
Add structure of accept_netdrv
Jakio815 Jan 31, 2025
001b5fe
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Jan 31, 2025
532922b
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Jan 31, 2025
4d28852
Add comments.
Jakio815 Jan 31, 2025
241d8da
Set handshake with client.
Jakio815 Jan 31, 2025
5be07ad
Add create_client and connect_to_netdrv for sst.
Jakio815 Jan 31, 2025
37145ff
Add user input path of sst config to federate.c
Jakio815 Jan 31, 2025
42a688c
Add get/set functions.
Jakio815 Jan 31, 2025
c374b72
Add read/write/shutdown functions.
Jakio815 Jan 31, 2025
a96d00e
Minor fix on adding void
Jakio815 Feb 1, 2025
8922c0d
Minor fix on adding `void` and new line on EOF.
Jakio815 Feb 1, 2025
ab3d92d
Enable finding the sst-c-api library, and include it in lf_sst_support.h
Jakio815 Feb 1, 2025
cedbddf
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Feb 1, 2025
2772e86
Add options to use user specified port for sst.
Jakio815 Feb 2, 2025
cb5b23b
Minor fix on including headers.
Jakio815 Feb 2, 2025
e7cea3b
Add -sst option for federate.
Jakio815 Feb 3, 2025
3fa7c48
Add usage for --sst for RTI.
Jakio815 Feb 4, 2025
a3887e9
Fix read/write to send header separately to match numbers. Fed-to-Fed…
Jakio815 Feb 6, 2025
53b2100
Minor cleanup.
Jakio815 Feb 6, 2025
af591a2
Fix read/write to match for fed2fed messages.
Jakio815 Feb 6, 2025
b92b737
Fix forwarding on port absent messages.
Jakio815 Feb 6, 2025
1e0267b
Merge pull request #514 from lf-lang/fix-wait-until
erlingrj Feb 7, 2025
d66eb3b
Avoid a double join on the sensor_simulator output thread
erlingrj Feb 10, 2025
ced9486
Fix memory bug when obtaining a port number
erlingrj Feb 10, 2025
141dee7
Merge pull request #518 from lf-lang/port-number-mem-bug
erlingrj Feb 11, 2025
8072355
Merge pull request #517 from lf-lang/avoid-double-join
erlingrj Feb 11, 2025
89f7b91
Merge branch 'main' into shutdown
edwardalee Feb 12, 2025
62bad67
Revert "Fix read/write to send header separately to match numbers. Fe…
Jakio815 Feb 12, 2025
c8dbb99
Add shutdown mutex.
Jakio815 Feb 28, 2025
4bf70ac
Add ref.
Jakio815 Feb 28, 2025
f6e8674
Minor fix.
Jakio815 Feb 28, 2025
b2846c2
Minor fix on formatting.
Jakio815 Feb 28, 2025
01a4b55
Fix names to chan.
Jakio815 Feb 28, 2025
83b91d1
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Feb 28, 2025
1c48985
Merge branch 'networkdriver' of github.com:lf-lang/reactor-c into sst
Jakio815 Feb 28, 2025
dbbc22c
Fix socket_mutex to inbound socket_mutex.
Jakio815 Mar 3, 2025
bf79fe9
Add return before goto.
Jakio815 Mar 3, 2025
618989d
Minor fix on descriptions.
Jakio815 Mar 4, 2025
c10017d
Merge branch 'shutdown' of github.com:lf-lang/reactor-c into sst
Jakio815 Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" -a, --auth Turn on HMAC authentication options.\n");
lf_print(" -t, --tracing Turn on tracing.\n");
lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n");
lf_print(" -sst, --sst SST config path for RTI.\n");

lf_print("Command given:");
for (int i = 0; i < argc; i++) {
Expand Down Expand Up @@ -234,7 +235,7 @@ int process_args(int argc, const char* argv[]) {
rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines
lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes);
} else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) {
#ifdef COMM_TYPE_TCP
#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST)
if (argc < i + 2) {
lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX);
usage(argc, argv);
Expand Down Expand Up @@ -266,6 +267,15 @@ int process_args(int argc, const char* argv[]) {
return 0;
#endif
rti.authentication_enabled = true;
} else if (strcmp(argv[i], "-sst") == 0 || strcmp(argv[i], "--sst") == 0) {
#ifndef COMM_TYPE_SST
lf_print_error("--sst requires the RTI to be built with the --DCOMM_TYPE=SST option.");
usage(argc, argv);
return 0;
#else
i++;
lf_set_sst_config_path(argv[i]);
#endif
} else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) {
rti.base.tracing_enabled = true;
} else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) {
Expand Down
1 change: 1 addition & 0 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,7 @@ void initialize_RTI(rti_remote_t* rti) {

// Initialize thread synchronization primitives
LF_MUTEX_INIT(&rti_mutex);
LF_MUTEX_INIT(&shutdown_mutex);
LF_COND_INIT(&received_start_times, &rti_mutex);
LF_COND_INIT(&sent_start_time, &rti_mutex);

Expand Down
31 changes: 12 additions & 19 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ extern bool _lf_termination_executed;

// Global variables references in federate.h
lf_mutex_t lf_outbound_netchan_mutex;

lf_mutex_t lf_inbound_netchan_mutex;

lf_cond_t lf_port_status_changed;

/**
Expand Down Expand Up @@ -400,12 +403,11 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen
* federate.
*/
static void close_inbound_netchan(int fed_id) {
LF_MUTEX_LOCK(&netchan_mutex);
if (_fed.netchans_for_inbound_p2p_connections[fed_id] != NULL) {
shutdown_netchan(_fed.netchans_for_inbound_p2p_connections[fed_id], false);
_fed.netchans_for_inbound_p2p_connections[fed_id] = NULL;
LF_MUTEX_LOCK(&lf_inbound_netchan_mutex);
if (_fed.netchans_for_inbound_p2p_connections[fed_id] >= 0) {
shutdown_netchan(&_fed.netchans_for_inbound_p2p_connections[fed_id], false);
}
LF_MUTEX_UNLOCK(&netchan_mutex);
LF_MUTEX_UNLOCK(&lf_inbound_netchan_mutex);
}

/**
Expand Down Expand Up @@ -722,7 +724,6 @@ static int handle_port_absent_message(netchan_t netchan, int fed_id) {
* network channel in _fed.netchans_for_inbound_p2p_connections
* to -1 and returns, terminating the thread.
* @param _args The remote federate ID (cast to void*).
* @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to.
* This procedure frees the memory pointed to before returning.
*/
static void* listen_to_federates(void* _args) {
Expand Down Expand Up @@ -1948,18 +1949,10 @@ void lf_connect_to_rti(const char* hostname, int port) {

void lf_create_server(int specified_port) {
assert(specified_port <= UINT16_MAX && specified_port >= 0);

netchan_t server_netchan = initialize_netchan();
set_my_port(server_netchan, specified_port);

if (create_server(server_netchan, false)) {
lf_print_error_system_failure("RTI failed to create server: %s.", strerror(errno));
if (create_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, TCP, false)) {
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
_fed.server_netchan = server_netchan;
// Get the final server port to send to the RTI on an MSG_TYPE_ADDRESS_ADVERTISEMENT message.
int32_t server_port = get_my_port(server_netchan);

LF_PRINT_LOG("Server for communicating with other federates started using port %d.", server_port);
LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port);

// Send the server port number to the RTI
// on an MSG_TYPE_ADDRESS_ADVERTISEMENT message (@see net_common.h).
Expand Down Expand Up @@ -2086,12 +2079,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
int result = lf_thread_create(&_fed.inbound_netchan_listeners[received_federates], listen_to_federates, fed_id_arg);
if (result != 0) {
// Failed to create a listening thread.
LF_MUTEX_LOCK(&netchan_mutex);
LF_MUTEX_LOCK(&lf_inbound_netchan_mutex);
if (_fed.netchans_for_inbound_p2p_connections[remote_fed_id] != NULL) {
shutdown_netchan(_fed.netchans_for_inbound_p2p_connections[remote_fed_id], false);
_fed.netchans_for_inbound_p2p_connections[remote_fed_id] = NULL;
}
LF_MUTEX_UNLOCK(&netchan_mutex);
LF_MUTEX_UNLOCK(&lf_inbound_netchan_mutex);
lf_print_error_and_exit("Failed to create a thread to listen for incoming physical connection. Error code: %d.",
result);
}
Expand Down
14 changes: 14 additions & 0 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,9 @@ void usage(int argc, const char* argv[]) {
printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n");
printf(" -l\n");
printf(" Send stdout to individual log files for each federate.\n\n");
#ifdef COMM_TYPE_SST
printf(" -sst, --sst <n>\n");
#endif
#endif

printf("Command given:\n");
Expand Down Expand Up @@ -1068,6 +1071,17 @@ int process_args(int argc, const char* argv[]) {
return 0;
}
}
#endif
#ifdef COMM_TYPE_SST
else if (strcmp(arg, "-sst") == 0 || strcmp(arg, "--sst") == 0) {
if (argc < i + 1) {
lf_print_error("--sst needs a string argument.");
usage(argc, argv);
return 0;
}
const char* fid = argv[i++];
lf_set_sst_config_path(fid);
}
#endif
else if (strcmp(arg, "--ros-args") == 0) {
// FIXME: Ignore ROS arguments for now
Expand Down
5 changes: 2 additions & 3 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) {
LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time);
// Check whether we actually need to wait, or if we have already passed the timepoint.
interval_t wait_duration = wait_until_time - lf_time_physical();
if (wait_duration < MIN_SLEEP_DURATION) {
LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than MIN_SLEEP_DURATION " PRINTF_TIME ". Skipping wait.",
wait_duration, MIN_SLEEP_DURATION);
if (wait_duration < 0) {
LF_PRINT_DEBUG("We have already passed " PRINTF_TIME ". Skipping wait.", wait_until_time);
return true;
}

Expand Down
7 changes: 6 additions & 1 deletion include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,15 @@ typedef enum parse_rti_code_t { SUCCESS, INVALID_PORT, INVALID_HOST, INVALID_USE
// Global variables

/**
* Mutex lock held while performing network channel write and close operations.
* Mutex lock held while performing outbound network channel write and close operations.
*/
extern lf_mutex_t lf_outbound_netchan_mutex;

/**
* Mutex lock held while performing inbound network channel write and close operations.
*/
extern lf_mutex_t lf_inbound_netchan_mutex;

/**
* Condition variable for blocking on unkonwn federate input ports.
*/
Expand Down
4 changes: 2 additions & 2 deletions include/core/threaded/reactor_threaded.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ void lf_synchronize_with_other_federates(void);
* if that event time matches or exceeds the specified time.
*
* The mutex lock associated with the condition argument is assumed to be held by
* the calling thread. This mutex is released while waiting. If the wait time is
* too small to actually wait (less than MIN_SLEEP_DURATION), then this function
* the calling thread. This mutex is released while waiting. If the current physical
* time has already passed the specified time, then this function
* immediately returns true and the mutex is not released.
*
* @param env Environment within which we are executing.
Expand Down
2 changes: 1 addition & 1 deletion logging/api/logging_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG;
} \
} while (0)
#endif // NDEBUG
#endif // LOGGING_MACROS_H
#endif // LOGGING_MACROS_H
8 changes: 4 additions & 4 deletions low_level_platform/api/low_level_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ int lf_mutex_lock(lf_mutex_t* mutex);
/**
* @brief Get the number of cores on the host machine.
*/
int lf_available_cores();
int lf_available_cores(void);

/**
* @brief Return the lf_thread_t of the calling thread.
*/
lf_thread_t lf_thread_self();
lf_thread_t lf_thread_self(void);

/**
* Create a new thread, starting with execution of lf_thread
Expand Down Expand Up @@ -273,12 +273,12 @@ int _lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time);
* @brief The ID of the current thread. The only guarantee is that these IDs will be a contiguous range of numbers
* starting at 0.
*/
int lf_thread_id();
int lf_thread_id(void);

/**
* @brief Initialize the thread ID for the current thread.
*/
void initialize_lf_thread_id();
void initialize_lf_thread_id(void);
#endif // !defined(LF_SINGLE_THREADED)

/**
Expand Down
15 changes: 15 additions & 0 deletions network/api/lf_sst_support.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef LF_SST_SUPPORT_H
#define LF_SST_SUPPORT_H

#include "socket_common.h"
#include <sst-c-api/c_api.h>

typedef struct sst_priv_t {
socket_priv_t* socket_priv;
SST_ctx_t* sst_ctx;
SST_session_ctx_t* session_ctx;
} sst_priv_t;

void lf_set_sst_config_path(const char* config_path);

#endif /* LF_SST_SUPPORT_H */
6 changes: 5 additions & 1 deletion network/api/net_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

#include "socket_common.h"

#if defined(COMM_TYPE_SST)
#include "lf_sst_support.h"
#endif

typedef void* netchan_t;

/**
* Allocate memory for the network channel.
* @return netchan_t Initialized network channel.
*/
netchan_t initialize_netchan();
netchan_t initialize_netchan(void);

/**
* Create a netchannel server. This is such as a server socket which accepts connections.
Expand Down
4 changes: 2 additions & 2 deletions network/api/socket_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@
typedef enum socket_type_t { TCP, UDP } socket_type_t;

/**
* Mutex protecting socket close operations.
* Mutex protecting socket shutdown operations.
*/
extern lf_mutex_t netchan_mutex;
extern lf_mutex_t shutdown_mutex;

typedef struct socket_priv_t {
int socket_descriptor;
Expand Down
4 changes: 4 additions & 0 deletions network/impl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ target_sources(lf-network-impl PUBLIC

if(COMM_TYPE MATCHES TCP)
target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c)
elseif(COMM_TYPE MATCHES SST)
find_package(sst-lib REQUIRED)
target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c)
target_link_libraries(lf-network-impl PUBLIC sst-lib::sst-c-api)
else()
message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.")
endif()
Expand Down
Loading
Loading