From fd7bd2223fa897a0212d2d93d263890e53309136 Mon Sep 17 00:00:00 2001 From: Byeonggil Jun Date: Tue, 28 Jan 2025 11:55:36 -0700 Subject: [PATCH 01/38] Perform busy wait when the wait duration is less than MIN_SLEEP_DURATION --- core/threaded/reactor_threaded.c | 7 +++++-- include/core/threaded/reactor_threaded.h | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 493bd5a3e..61c771118 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -200,9 +200,12 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); if (wait_duration < MIN_SLEEP_DURATION) { - LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than MIN_SLEEP_DURATION " PRINTF_TIME ". Skipping wait.", + LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than lf_min_sleep_duration " PRINTF_TIME + ". Performing busy wait.", wait_duration, MIN_SLEEP_DURATION); - return true; + while (lf_time_physical() < wait_until_time) { + // Busy wait + } } // We do the sleep on the cond var so we can be awakened by the diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 2f5463165..e8b0cd0c6 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -94,8 +94,8 @@ void lf_synchronize_with_other_federates(void); * * The mutex lock associated with the condition argument is assumed to be held by * the calling thread. This mutex is released while waiting. If the wait time is - * too small to actually wait (less than MIN_SLEEP_DURATION), then this function - * immediately returns true and the mutex is not released. + * too small (less than MIN_SLEEP_DURATION) to wait using lf_clock_cond_timedwait, + * then this function performs busy wait and the mutex is not released. * * @param env Environment within which we are executing. * @param wait_until_time The time to wait until physical time matches it. From 184b55f0b5a846ae839f57bc4e3e319268eeb15a Mon Sep 17 00:00:00 2001 From: Byeonggil Jun Date: Wed, 29 Jan 2025 09:04:17 -0700 Subject: [PATCH 02/38] Do not check MIN_SLEEP_DURATION when waiting for the physical time to exceed the next tag --- core/threaded/reactor_threaded.c | 8 -------- include/core/threaded/reactor_threaded.h | 4 +--- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 61c771118..9f6f8cd35 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -199,14 +199,6 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); - if (wait_duration < MIN_SLEEP_DURATION) { - LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than lf_min_sleep_duration " PRINTF_TIME - ". Performing busy wait.", - wait_duration, MIN_SLEEP_DURATION); - while (lf_time_physical() < wait_until_time) { - // Busy wait - } - } // We do the sleep on the cond var so we can be awakened by the // asynchronous scheduling of a physical action. lf_clock_cond_timedwait diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index e8b0cd0c6..fe14beb8e 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -93,9 +93,7 @@ void lf_synchronize_with_other_federates(void); * if that event time matches or exceeds the specified time. * * The mutex lock associated with the condition argument is assumed to be held by - * the calling thread. This mutex is released while waiting. If the wait time is - * too small (less than MIN_SLEEP_DURATION) to wait using lf_clock_cond_timedwait, - * then this function performs busy wait and the mutex is not released. + * the calling thread. * * @param env Environment within which we are executing. * @param wait_until_time The time to wait until physical time matches it. From f9081b172cc47f8a0e35cf8bd0f46258bd599a5b Mon Sep 17 00:00:00 2001 From: Byeonggil Jun Date: Wed, 29 Jan 2025 16:30:54 -0700 Subject: [PATCH 03/38] Revert changes --- core/threaded/reactor_threaded.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 9f6f8cd35..493bd5a3e 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -199,6 +199,11 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); + if (wait_duration < MIN_SLEEP_DURATION) { + LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than MIN_SLEEP_DURATION " PRINTF_TIME ". Skipping wait.", + wait_duration, MIN_SLEEP_DURATION); + return true; + } // We do the sleep on the cond var so we can be awakened by the // asynchronous scheduling of a physical action. lf_clock_cond_timedwait From 98b1090c0edd70abe2f7f257ebf3ec55f8843888 Mon Sep 17 00:00:00 2001 From: Byeonggil Jun Date: Thu, 30 Jan 2025 10:29:45 -0700 Subject: [PATCH 04/38] Make immediately return when we have already passed the target physical time --- core/threaded/reactor_threaded.c | 5 ++--- include/core/threaded/reactor_threaded.h | 4 +++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 493bd5a3e..7872bf9db 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -199,9 +199,8 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); - if (wait_duration < MIN_SLEEP_DURATION) { - LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than MIN_SLEEP_DURATION " PRINTF_TIME ". Skipping wait.", - wait_duration, MIN_SLEEP_DURATION); + if (wait_duration < 0) { + LF_PRINT_DEBUG("We have already passed " PRINTF_TIME ". Skipping wait.", wait_until_time); return true; } diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index fe14beb8e..727b3839e 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -93,7 +93,9 @@ void lf_synchronize_with_other_federates(void); * if that event time matches or exceeds the specified time. * * The mutex lock associated with the condition argument is assumed to be held by - * the calling thread. + * the calling thread. This mutex is released while waiting. If the current physical + * time has already passed the specified time, then this function + * immediately returns true and the mutex is not released. * * @param env Environment within which we are executing. * @param wait_until_time The time to wait until physical time matches it. From 09fd0779e8e3e86257a4a9b05ec2601271261faa Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 30 Jan 2025 20:08:46 -0700 Subject: [PATCH 05/38] Add sst_support. --- network/impl/CMakeLists.txt | 2 ++ network/impl/src/lf_sst_support.c | 1 + 2 files changed, 3 insertions(+) create mode 100644 network/impl/src/lf_sst_support.c diff --git a/network/impl/CMakeLists.txt b/network/impl/CMakeLists.txt index 225edf3d5..60b61157c 100644 --- a/network/impl/CMakeLists.txt +++ b/network/impl/CMakeLists.txt @@ -10,6 +10,8 @@ target_sources(lf-network-impl PUBLIC if(COMM_TYPE MATCHES TCP) target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c) +elseif(COMM_TYPE MATCHES SST) + target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c) else() message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.") endif() diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c new file mode 100644 index 000000000..2f74ed84f --- /dev/null +++ b/network/impl/src/lf_sst_support.c @@ -0,0 +1 @@ +#include "net_driver.h" From 3fb0bf916f99c4416212831250df4ed2118129ae Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 14:55:35 -0700 Subject: [PATCH 06/38] Add sst_priv_t struct. --- network/api/lf_sst_support.h | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 network/api/lf_sst_support.h diff --git a/network/api/lf_sst_support.h b/network/api/lf_sst_support.h new file mode 100644 index 000000000..13cc8b166 --- /dev/null +++ b/network/api/lf_sst_support.h @@ -0,0 +1,12 @@ +#ifndef LF_SST_SUPPORT_H +#define LF_SST_SUPPORT_H + +#include "socket_common.h" + +typedef struct sst_priv_t { + socket_priv_t* socket_priv; + SST_ctx_t* sst_ctx; + SST_session_ctx_t* session_ctx; +} sst_priv_t; + +#endif /* LF_SST_SUPPORT_H */ From 27ac24e66e82f1b18793786c2aec85cc724b0a16 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 14:55:47 -0700 Subject: [PATCH 07/38] Add initialize_netdrv for sst. --- network/impl/src/lf_sst_support.c | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 2f74ed84f..65c9fac6d 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -1 +1,33 @@ #include "net_driver.h" +#include "lf_sst_support.h" +#include "util.h" + +netdrv_t initialize_netdrv() { + // Initialize sst_priv. + sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t)); + if (sst_priv == NULL) { + lf_print_error_and_exit("Falied to malloc sst_priv_t."); + } + // Initialize socket_priv. + socket_priv_t* socket_priv = malloc(sizeof(socket_priv_t)); + if (socket_priv == NULL) { + lf_print_error_and_exit("Falied to malloc socket_priv_t."); + } + + // Server initialization. + socket_priv->port = 0; + socket_priv->user_specified_port = 0; + socket_priv->socket_descriptor = -1; + + // Federate initialization + strncpy(socket_priv->server_hostname, "localhost", INET_ADDRSTRLEN); + socket_priv->server_ip_addr.s_addr = 0; + socket_priv->server_port = -1; + + // SST initialization. Only set pointers to NULL. + sst_priv->sst_ctx = NULL; + sst_priv->session_ctx = NULL; + + return (netdrv_t)sst_priv; +} + From a814a07e847f28370b21a6ea677c3c9319cb04ab Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 14:59:39 -0700 Subject: [PATCH 08/38] Add free_netdrv for sst --- network/impl/src/lf_sst_support.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 65c9fac6d..29684e71c 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -2,6 +2,14 @@ #include "lf_sst_support.h" #include "util.h" +static sst_priv_t* get_sst_priv_t(netdrv_t drv) { + if (drv == NULL) { + lf_print_error("Network driver is already closed."); + return NULL; + } + return (sst_priv_t*)drv; +} + netdrv_t initialize_netdrv() { // Initialize sst_priv. sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t)); @@ -24,6 +32,8 @@ netdrv_t initialize_netdrv() { socket_priv->server_ip_addr.s_addr = 0; socket_priv->server_port = -1; + sst_priv->socket_priv = socket_priv; + // SST initialization. Only set pointers to NULL. sst_priv->sst_ctx = NULL; sst_priv->session_ctx = NULL; @@ -31,3 +41,9 @@ netdrv_t initialize_netdrv() { return (netdrv_t)sst_priv; } +void free_netdrv(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + free(priv->socket_priv); + free(priv); +} + From 82e63bd3cf3dca29aa6603cef23b3cbd63fa4942 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 15:10:02 -0700 Subject: [PATCH 09/38] Add create_server, with also passing path as global var in lf_sst_support.h --- core/federated/RTI/main.c | 9 +++++++++ network/api/lf_sst_support.h | 2 ++ network/api/net_driver.h | 4 ++++ network/impl/src/lf_sst_support.c | 10 ++++++++++ 4 files changed, 25 insertions(+) diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index b7507615c..63a847696 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -266,6 +266,15 @@ int process_args(int argc, const char* argv[]) { return 0; #endif rti.authentication_enabled = true; + } else if (strcmp(argv[i], "-sst") == 0 || strcmp(argv[i], "--sst") == 0) { +#ifndef COMM_TYPE_SST + lf_print_error("--sst requires the RTI to be built with the --DCOMM_TYPE=SST option."); + usage(argc, argv); + return 0; +#else + i++; + lf_set_sst_config_path(argv[i]); +#endif } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; } else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) { diff --git a/network/api/lf_sst_support.h b/network/api/lf_sst_support.h index 13cc8b166..c37bf5b77 100644 --- a/network/api/lf_sst_support.h +++ b/network/api/lf_sst_support.h @@ -9,4 +9,6 @@ typedef struct sst_priv_t { SST_session_ctx_t* session_ctx; } sst_priv_t; +void lf_set_sst_config_path(const char* config_path); + #endif /* LF_SST_SUPPORT_H */ diff --git a/network/api/net_driver.h b/network/api/net_driver.h index b29e8b108..d01478a02 100644 --- a/network/api/net_driver.h +++ b/network/api/net_driver.h @@ -3,6 +3,10 @@ #include "socket_common.h" +#if defined(COMM_TYPE_SST) +#include "lf_sst_support.h" +#endif + typedef void* netdrv_t; /** diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 29684e71c..b196c6424 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -2,6 +2,8 @@ #include "lf_sst_support.h" #include "util.h" +const char* sst_config_path; // The SST's configuration file path. + static sst_priv_t* get_sst_priv_t(netdrv_t drv) { if (drv == NULL) { lf_print_error("Network driver is already closed."); @@ -47,3 +49,11 @@ void free_netdrv(netdrv_t drv) { free(priv); } +int create_server(netdrv_t drv, bool increment_port_on_retry) { + sst_priv_t* priv = get_sst_priv_t(drv); + SST_ctx_t* ctx = init_SST(sst_config_path); + return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, + &priv->socket_priv->port, TCP, increment_port_on_retry); +} + +void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } From 4a5f1b6a5efed81619900f6c4c04f5b85c0ab90e Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 15:20:00 -0700 Subject: [PATCH 10/38] Add structure of accept_netdrv --- network/impl/src/lf_sst_support.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index b196c6424..0f7bd8c92 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -56,4 +56,33 @@ int create_server(netdrv_t drv, bool increment_port_on_retry) { &priv->socket_priv->port, TCP, increment_port_on_retry); } +netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { + sst_priv_t* serv_priv = get_sst_priv_t(server_drv); + int rti_socket; + if (rti_drv == NULL) { + rti_socket = -1; + } else { + sst_priv_t* rti_priv = get_sst_priv_t(rti_drv); + rti_socket = rti_priv->socket_priv->socket_descriptor; + } + netdrv_t fed_netdrv = initialize_netdrv(); + sst_priv_t* fed_priv = get_sst_priv_t(fed_netdrv); + + int sock = accept_socket(serv_priv->socket_priv->socket_descriptor, rti_socket); + if (sock == -1) { + free_netdrv(fed_netdrv); + return NULL; + } + fed_priv->socket_priv->socket_descriptor = sock; + // Get the peer address from the connected socket_id. Saving this for the address query. + if (get_peer_address(fed_netdrv) != 0) { + lf_print_error("RTI failed to get peer address."); + }; + + + session_key_list_t *s_key_list = init_empty_session_key_list(); + return fed_netdrv; +} + +// Helper function. void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } From 4d28852df0d92f4f481917be002b4e3d77c64187 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 15:31:51 -0700 Subject: [PATCH 11/38] Add comments. --- network/impl/src/lf_sst_support.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 0f7bd8c92..22427bedc 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -60,6 +60,8 @@ netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { sst_priv_t* serv_priv = get_sst_priv_t(server_drv); int rti_socket; if (rti_drv == NULL) { + // Set to -1, to indicate that this accept_netdrv() call is not trying to check if the rti_drv is available, inside + // the accept_socket() function. rti_socket = -1; } else { sst_priv_t* rti_priv = get_sst_priv_t(rti_drv); @@ -79,8 +81,7 @@ netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { lf_print_error("RTI failed to get peer address."); }; - - session_key_list_t *s_key_list = init_empty_session_key_list(); + session_key_list_t* s_key_list = init_empty_session_key_list(); return fed_netdrv; } From 241d8dae06e4ae08293edbd17a2dcbc0599d201d Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 15:56:20 -0700 Subject: [PATCH 12/38] Set handshake with client. --- network/impl/src/lf_sst_support.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 22427bedc..8e65b5228 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -77,11 +77,17 @@ netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { } fed_priv->socket_priv->socket_descriptor = sock; // Get the peer address from the connected socket_id. Saving this for the address query. - if (get_peer_address(fed_netdrv) != 0) { + if (get_peer_address(fed_priv->socket_priv) != 0) { lf_print_error("RTI failed to get peer address."); }; + // TODO: Do we need to copy sst_ctx form server_drv to fed_drv? session_key_list_t* s_key_list = init_empty_session_key_list(); + SST_session_ctx_t* session_ctx = + server_secure_comm_setup(serv_priv->sst_ctx, fed_priv->socket_priv->socket_descriptor, s_key_list); + // Session key used is copied to the session_ctx. + free_session_key_list_t(s_key_list); + fed_priv->session_ctx = session_ctx; return fed_netdrv; } From 5be07ad4036c22e1c9158cc6b759be7c914bde95 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 16:13:52 -0700 Subject: [PATCH 13/38] Add create_client and connect_to_netdrv for sst. --- network/impl/src/lf_sst_support.c | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 8e65b5228..5dbd19eae 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -91,5 +91,25 @@ netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { return fed_netdrv; } +void create_client(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit(); + SST_ctx_t* ctx = init_SST(sst_config_path); + priv->sst_ctx = ctx; +} + +int connect_to_netdrv(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + int ret = connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname, + priv->socket_priv->server_port); + if (ret != 0) { + return ret; + } + session_key_list_t* s_key_list = get_session_key(priv->sst_ctx, NULL); + SST_session_ctx_t* session_ctx = secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor); + priv->session_ctx = session_ctx; + return 0; +} + // Helper function. void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } From 37145ff73f67211ecce1b3cdfa1bfdec5aee9407 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 16:14:11 -0700 Subject: [PATCH 14/38] Add user input path of sst config to federate.c --- core/reactor_common.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/reactor_common.c b/core/reactor_common.c index 7257e5132..7f3049d10 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -1068,6 +1068,17 @@ int process_args(int argc, const char* argv[]) { return 0; } } +#endif +#ifdef COMM_TYPE_SST + else if (strcmp(arg, "-sst") == 0 || strcmp(arg, "--sst") == 0) { + if (argc < i + 1) { + lf_print_error("--sst needs a string argument."); + usage(argc, argv); + return 0; + } + const char* fid = argv[i++]; + lf_set_sst_config_path(fid); + } #endif else if (strcmp(arg, "--ros-args") == 0) { // FIXME: Ignore ROS arguments for now From 42a688c36c6e06d6e03fa385f12a7331c3a2573b Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 16:18:50 -0700 Subject: [PATCH 15/38] Add get/set functions. --- network/impl/src/lf_sst_support.c | 37 +++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 5dbd19eae..934ff7bc7 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -111,5 +111,42 @@ int connect_to_netdrv(netdrv_t drv) { return 0; } +// Get/set functions. +int32_t get_my_port(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + return priv->socket_priv->port; +} + +int32_t get_server_port(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + return priv->socket_priv->server_port; +} + +struct in_addr* get_ip_addr(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + return &priv->socket_priv->server_ip_addr; +} + +char* get_server_hostname(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + return priv->socket_priv->server_hostname; +} + +void set_my_port(netdrv_t drv, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(drv); + priv->socket_priv->port = port; +} + +void set_server_port(netdrv_t drv, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(drv); + priv->socket_priv->server_port = port; +} + +void set_server_hostname(netdrv_t drv, const char* hostname) { + sst_priv_t* priv = get_sst_priv_t(drv); + memcpy(priv->socket_priv->server_hostname, hostname, INET_ADDRSTRLEN); +} + + // Helper function. void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } From c374b7225d1878ed0c2f08e8c17326fb5d17510f Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 31 Jan 2025 16:21:18 -0700 Subject: [PATCH 16/38] Add read/write/shutdown functions. --- network/impl/src/lf_sst_support.c | 98 ++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 934ff7bc7..e4ed51ebe 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -106,11 +106,106 @@ int connect_to_netdrv(netdrv_t drv) { return ret; } session_key_list_t* s_key_list = get_session_key(priv->sst_ctx, NULL); - SST_session_ctx_t* session_ctx = secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor); + SST_session_ctx_t* session_ctx = + secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor); priv->session_ctx = session_ctx; return 0; } +// TODO: +int read_from_netdrv(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(drv); + return read_from_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); +} + +int read_from_netdrv_close_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(drv); + int read_failed = read_from_netdrv(drv, num_bytes, buffer); + if (read_failed) { + // Read failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + return -1; + } + return 0; +} + +void read_from_netdrv_fail_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + int read_failed = read_from_netdrv_close_on_error(drv, num_bytes, buffer); + if (read_failed) { + // Read failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to read from socket."); + } + } +} + +int write_to_netdrv(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(drv); + return write_to_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); +} + +int write_to_netdrv_close_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(drv); + int result = write_to_netdrv(drv, num_bytes, buffer); + if (result) { + // Write failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + } + return result; +} + +void write_to_netdrv_fail_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + int result = write_to_netdrv_close_on_error(drv, num_bytes, buffer); + if (result) { + // Write failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error("Failed to write to socket. Closing it."); + } + } +} + +bool check_netdrv_closed(netdrv_t drv) { + sst_priv_t* priv = get_sst_priv_t(drv); + return check_socket_closed(priv->socket_priv->socket_descriptor); +} + +int shutdown_netdrv(netdrv_t drv, bool read_before_closing) { + if (drv == NULL) { + lf_print("Socket already closed."); + return 0; + } + sst_priv_t* priv = get_sst_priv_t(drv); + int ret = shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing); + if (ret != 0) { + lf_print_error("Failed to shutdown socket."); + } + free_netdrv(drv); + return ret; +} +// END of TODO: + // Get/set functions. int32_t get_my_port(netdrv_t drv) { sst_priv_t* priv = get_sst_priv_t(drv); @@ -147,6 +242,5 @@ void set_server_hostname(netdrv_t drv, const char* hostname) { memcpy(priv->socket_priv->server_hostname, hostname, INET_ADDRSTRLEN); } - // Helper function. void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } From a96d00e91a4a71fadbf5cf4cf9138965f2873b41 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 1 Feb 2025 15:13:32 -0700 Subject: [PATCH 17/38] Minor fix on adding void --- network/api/net_driver.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/api/net_driver.h b/network/api/net_driver.h index d01478a02..d0285935e 100644 --- a/network/api/net_driver.h +++ b/network/api/net_driver.h @@ -13,7 +13,7 @@ typedef void* netdrv_t; * Allocate memory for the network driver. * @return netdrv_t Initialized network driver. */ -netdrv_t initialize_netdrv(); +netdrv_t initialize_netdrv(void); /** * Create a netdriver server. This is such as a server socket which accepts connections. From 8922c0d374aa1fd35dc1cc67f49ce70f8f657364 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 1 Feb 2025 15:14:20 -0700 Subject: [PATCH 18/38] Minor fix on adding `void` and new line on EOF. --- logging/api/logging_macros.h | 2 +- low_level_platform/api/low_level_platform.h | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/logging/api/logging_macros.h b/logging/api/logging_macros.h index 3e22950b5..0fbabd157 100644 --- a/logging/api/logging_macros.h +++ b/logging/api/logging_macros.h @@ -107,4 +107,4 @@ static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG; } \ } while (0) #endif // NDEBUG -#endif // LOGGING_MACROS_H \ No newline at end of file +#endif // LOGGING_MACROS_H diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index afffd2a9e..3fe4d42d3 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -113,12 +113,12 @@ int lf_mutex_lock(lf_mutex_t* mutex); /** * @brief Get the number of cores on the host machine. */ -int lf_available_cores(); +int lf_available_cores(void); /** * @brief Return the lf_thread_t of the calling thread. */ -lf_thread_t lf_thread_self(); +lf_thread_t lf_thread_self(void); /** * Create a new thread, starting with execution of lf_thread @@ -273,12 +273,12 @@ int _lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time); * @brief The ID of the current thread. The only guarantee is that these IDs will be a contiguous range of numbers * starting at 0. */ -int lf_thread_id(); +int lf_thread_id(void); /** * @brief Initialize the thread ID for the current thread. */ -void initialize_lf_thread_id(); +void initialize_lf_thread_id(void); #endif // !defined(LF_SINGLE_THREADED) /** From ab3d92dd664de4ebc457f42ffe20a65ab5f1275c Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 1 Feb 2025 15:14:59 -0700 Subject: [PATCH 19/38] Enable finding the sst-c-api library, and include it in lf_sst_support.h --- network/api/lf_sst_support.h | 1 + network/impl/CMakeLists.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/network/api/lf_sst_support.h b/network/api/lf_sst_support.h index c37bf5b77..7fdb941ab 100644 --- a/network/api/lf_sst_support.h +++ b/network/api/lf_sst_support.h @@ -2,6 +2,7 @@ #define LF_SST_SUPPORT_H #include "socket_common.h" +#include typedef struct sst_priv_t { socket_priv_t* socket_priv; diff --git a/network/impl/CMakeLists.txt b/network/impl/CMakeLists.txt index 60b61157c..e078e9e86 100644 --- a/network/impl/CMakeLists.txt +++ b/network/impl/CMakeLists.txt @@ -11,7 +11,9 @@ target_sources(lf-network-impl PUBLIC if(COMM_TYPE MATCHES TCP) target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c) elseif(COMM_TYPE MATCHES SST) + find_package(sst-lib REQUIRED) target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c) + target_link_libraries(lf-network-impl PUBLIC sst-lib::sst-c-api) else() message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.") endif() From 2772e866917c008a0c10ca631bbed4834a39b141 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 1 Feb 2025 19:10:24 -0700 Subject: [PATCH 20/38] Add options to use user specified port for sst. --- core/federated/RTI/main.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 63a847696..2e0f9ae67 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -234,7 +234,7 @@ int process_args(int argc, const char* argv[]) { rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { -#ifdef COMM_TYPE_TCP +#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST) if (argc < i + 2) { lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX); usage(argc, argv); From cb5b23bc15e064f4b1b544b3a8c7ea02442a671d Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sat, 1 Feb 2025 19:10:47 -0700 Subject: [PATCH 21/38] Minor fix on including headers. --- network/impl/src/lf_sst_support.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index e4ed51ebe..982404fc0 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -1,3 +1,6 @@ +#include // malloc() +#include // strncpy() + #include "net_driver.h" #include "lf_sst_support.h" #include "util.h" @@ -52,6 +55,7 @@ void free_netdrv(netdrv_t drv) { int create_server(netdrv_t drv, bool increment_port_on_retry) { sst_priv_t* priv = get_sst_priv_t(drv); SST_ctx_t* ctx = init_SST(sst_config_path); + priv->sst_ctx = ctx; return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, &priv->socket_priv->port, TCP, increment_port_on_retry); } From e7cea3b9a81f616431f3eb134ac79f0c95c47a5b Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Sun, 2 Feb 2025 19:55:04 -0700 Subject: [PATCH 22/38] Add -sst option for federate. --- core/reactor_common.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/reactor_common.c b/core/reactor_common.c index 7f3049d10..90b972029 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -919,6 +919,9 @@ void usage(int argc, const char* argv[]) { printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n"); printf(" -l\n"); printf(" Send stdout to individual log files for each federate.\n\n"); +#ifdef COMM_TYPE_SST + printf(" -sst, --sst \n"); +#endif #endif printf("Command given:\n"); From 3fa7c48deb2ff96b8650a279f3a7820d5bb05632 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Tue, 4 Feb 2025 15:18:15 -0700 Subject: [PATCH 23/38] Add usage for --sst for RTI. --- core/federated/RTI/main.c | 1 + 1 file changed, 1 insertion(+) diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 2e0f9ae67..06818b629 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -137,6 +137,7 @@ void usage(int argc, const char* argv[]) { lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n"); + lf_print(" -sst, --sst SST config path for RTI.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { From a3887e9b578ef3612877374d23abb9ce92bdc044 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 6 Feb 2025 14:10:33 -0700 Subject: [PATCH 24/38] Fix read/write to send header separately to match numbers. Fed-to-Fed not done yet. --- core/federated/RTI/rti_remote.c | 30 +++++++++++++-------- core/federated/federate.c | 47 +++++++++++++++++++++------------ 2 files changed, 49 insertions(+), 28 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 296774338..6eb8a09e8 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -75,7 +75,8 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // This function is called in notify_advance_grant_if_safe(), which is a long // function. During this call, the network driver might close, causing the following write_to_netdrv // to fail. Consider a failure here a soft failure and update the federate's status. - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || + write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -108,7 +109,8 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // This function is called in notify_advance_grant_if_safe(), which is a long // function. During this call, the network driver might close, causing the following write_to_netdrv // to fail. Consider a failure here a soft failure and update the federate's status. - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || + write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -165,7 +167,7 @@ void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_DNET, e->id, &tag); } - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -243,9 +245,8 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char } void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer) { - size_t header_size = 1 + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t); - // Read the header, minus the first byte which has already been read. - read_from_netdrv_fail_on_error(sending_federate->fed_netdrv, header_size - 1, &(buffer[1]), NULL, + size_t header_size = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t); + read_from_netdrv_fail_on_error(sending_federate->fed_netdrv, header_size, &(buffer[1]), NULL, "RTI failed to read the timed message header from remote federate."); // Extract the header information. of the sender uint16_t reactor_port_id; @@ -331,8 +332,9 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_TAGGED_MSG, federate_id, &intended_tag); } - - write_to_netdrv_fail_on_error(fed->fed_netdrv, bytes_read, buffer, &rti_mutex, + write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, buffer, &rti_mutex, + "RTI failed to forward message header to federate %d.", federate_id); + write_to_netdrv_fail_on_error(fed->fed_netdrv, bytes_read - 1, buffer + 1, &rti_mutex, "RTI failed to forward message to federate %d.", federate_id); // The message length may be longer than the buffer, @@ -458,7 +460,10 @@ static void broadcast_stop_time_to_federates_locked() { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_STOP_GRN, fed->enclave.id, &rti_remote->base.max_stop_tag); } - write_to_netdrv_fail_on_error(fed->fed_netdrv, MSG_TYPE_STOP_GRANTED_LENGTH, outgoing_buffer, &rti_mutex, + write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, outgoing_buffer, &rti_mutex, + "RTI failed to send MSG_TYPE_STOP_GRANTED message header to federate %d.", + fed->enclave.id); + write_to_netdrv_fail_on_error(fed->fed_netdrv, MSG_TYPE_STOP_GRANTED_LENGTH - 1, outgoing_buffer + 1, &rti_mutex, "RTI failed to send MSG_TYPE_STOP_GRANTED message to federate %d.", fed->enclave.id); } @@ -578,8 +583,11 @@ void handle_stop_request_message(federate_info_t* fed) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_STOP_REQ, f->enclave.id, &rti_remote->base.max_stop_tag); } - write_to_netdrv_fail_on_error(f->fed_netdrv, MSG_TYPE_STOP_REQUEST_LENGTH, stop_request_buffer, &rti_mutex, - "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.", + write_to_netdrv_fail_on_error(f->fed_netdrv, 1, stop_request_buffer, &rti_mutex, + "RTI failed to forward MSG_TYPE_STOP_REQUEST message header to federate %d.", + f->enclave.id); + write_to_netdrv_fail_on_error(f->fed_netdrv, MSG_TYPE_STOP_REQUEST_LENGTH - 1, stop_request_buffer + 1, + &rti_mutex, "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.", f->enclave.id); } } diff --git a/core/federated/federate.c b/core/federated/federate.c index 460f72255..c5dad4454 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -111,7 +111,9 @@ static void send_time(unsigned char type, instant_t time) { tracepoint_federate_to_rti(send_TIMESTAMP, _lf_my_fed_id, &tag); LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write, buffer, &lf_outbound_netdrv_mutex, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, + "Failed to send MSG_TYPE_TIMESTAMP header."); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write - 1, buffer + 1, &lf_outbound_netdrv_mutex, "Failed to send time " PRINTF_TIME " to the RTI.", time - start_time); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); } @@ -138,7 +140,8 @@ static void send_tag(unsigned char type, tag_t tag) { trace_event_t event_type = (type == MSG_TYPE_NEXT_EVENT_TAG) ? send_NET : send_LTC; // Trace the event when tracing is enabled tracepoint_federate_to_rti(event_type, _lf_my_fed_id, &tag); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write, buffer, &lf_outbound_netdrv_mutex, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, "Failed to send tag header."); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write - 1, buffer + 1, &lf_outbound_netdrv_mutex, "Failed to send tag " PRINTF_TAG " to the RTI.", tag.time - start_time, tag.microstep); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); } @@ -1418,7 +1421,9 @@ static void handle_stop_request_message() { // Send the current logical time to the RTI. LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH, outgoing_buffer, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, outgoing_buffer, &lf_outbound_netdrv_mutex, + "Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI."); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH - 1, outgoing_buffer + 1, &lf_outbound_netdrv_mutex, "Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI."); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); @@ -1543,12 +1548,13 @@ static void* listen_to_rti_netdrv(void* args) { case MSG_TYPE_STOP_GRANTED: handle_stop_granted_message(); break; - case MSG_TYPE_PORT_ABSENT: - if (handle_port_absent_message(_fed.netdrv_to_RTI, -1)) { - // Failures to complete the read of absent messages from the RTI are fatal. - lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); - } - break; +//TODO: Check. RTI does not send MSG_TYPE_PORT_ABSENT + // case MSG_TYPE_PORT_ABSENT: + // if (handle_port_absent_message(_fed.netdrv_to_RTI, -1)) { + // // Failures to complete the read of absent messages from the RTI are fatal. + // lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); + // } + // break; case MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG: handle_downstream_next_event_tag(); break; @@ -1711,7 +1717,9 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL); LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_netdrv_mutex, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, + "Failed to send address query header for federate %d to RTI.", remote_federate_id); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(uint16_t), buffer + 1, &lf_outbound_netdrv_mutex, "Failed to send address query for federate %d to RTI.", remote_federate_id); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); @@ -1971,7 +1979,9 @@ void lf_create_server(int specified_port) { tracepoint_federate_to_rti(send_ADR_AD, _lf_my_fed_id, NULL); // No need for a mutex because we have the only handle on this network driver. - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(int32_t) + 1, (unsigned char*)buffer, NULL, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, (unsigned char*)buffer, NULL, + "Failed to send address advertisement header."); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(int32_t), (unsigned char*)buffer + 1, NULL, "Failed to send address advertisement."); LF_PRINT_DEBUG("Sent port %d to the RTI.", server_port); @@ -2411,7 +2421,8 @@ void lf_send_port_absent_to_federate(environment_t* env, interval_t additional_d } LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - int result = write_to_netdrv_close_on_error(netdrv, message_length, buffer); + int result = write_to_netdrv_close_on_error(netdrv, 1, buffer); + result = write_to_netdrv_close_on_error(netdrv, message_length - 1, buffer + 1); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); if (result != 0) { @@ -2448,9 +2459,11 @@ int lf_send_stop_request_to_rti(tag_t stop_tag) { } // Trace the event when tracing is enabled tracepoint_federate_to_rti(send_STOP_REQ, _lf_my_fed_id, &stop_tag); - - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_LENGTH, buffer, &lf_outbound_netdrv_mutex, - "Failed to send stop time " PRINTF_TIME " to the RTI.", stop_tag.time - start_time); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, + "Failed to send stop request header."); + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_LENGTH - 1, buffer + 1, + &lf_outbound_netdrv_mutex, "Failed to send stop time " PRINTF_TIME " to the RTI.", + stop_tag.time - start_time); // Treat this sending as equivalent to having received a stop request from the RTI. _fed.received_stop_request_from_rti = true; @@ -2524,8 +2537,8 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int if (lf_tag_compare(_fed.last_DNET, current_message_intended_tag) > 0) { _fed.last_DNET = current_message_intended_tag; } - - int result = write_to_netdrv_close_on_error(netdrv, header_length, header_buffer); + int result = write_to_netdrv_close_on_error(netdrv, 1, header_buffer); + result = write_to_netdrv_close_on_error(netdrv, header_length - 1, header_buffer + 1); if (result == 0) { // Header sent successfully. Send the body. result = write_to_netdrv_close_on_error(netdrv, length, message); From 53b2100d0e315ecc91a2b1bb7257241e38d7aced Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 6 Feb 2025 14:11:23 -0700 Subject: [PATCH 25/38] Minor cleanup. --- core/federated/federate.c | 1 - 1 file changed, 1 deletion(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index c5dad4454..f660aba66 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -725,7 +725,6 @@ static int handle_port_absent_message(netdrv_t netdrv, int fed_id) { * network driver in _fed.netdrvs_for_inbound_p2p_connections * to -1 and returns, terminating the thread. * @param _args The remote federate ID (cast to void*). - * @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to. * This procedure frees the memory pointed to before returning. */ static void* listen_to_federates(void* _args) { From af591a24cbd431021621e73be05db9e7662f8d61 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 6 Feb 2025 14:14:26 -0700 Subject: [PATCH 26/38] Fix read/write to match for fed2fed messages. --- core/federated/federate.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index f660aba66..1f92ecf6e 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1547,13 +1547,13 @@ static void* listen_to_rti_netdrv(void* args) { case MSG_TYPE_STOP_GRANTED: handle_stop_granted_message(); break; -//TODO: Check. RTI does not send MSG_TYPE_PORT_ABSENT - // case MSG_TYPE_PORT_ABSENT: - // if (handle_port_absent_message(_fed.netdrv_to_RTI, -1)) { - // // Failures to complete the read of absent messages from the RTI are fatal. - // lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); - // } - // break; + // TODO: Check. RTI does not send MSG_TYPE_PORT_ABSENT + // case MSG_TYPE_PORT_ABSENT: + // if (handle_port_absent_message(_fed.netdrv_to_RTI, -1)) { + // // Failures to complete the read of absent messages from the RTI are fatal. + // lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); + // } + // break; case MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG: handle_downstream_next_event_tag(); break; @@ -2212,7 +2212,8 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa // Trace the event when tracing is enabled tracepoint_federate_to_federate(send_P2P_MSG, _lf_my_fed_id, federate, NULL); - int result = write_to_netdrv_close_on_error(netdrv, header_length, header_buffer); + int result = write_to_netdrv_close_on_error(netdrv, 1, header_buffer); + result = write_to_netdrv_close_on_error(netdrv, header_length - 1, header_buffer + 1); if (result == 0) { // Header sent successfully. Send the body. result = write_to_netdrv_close_on_error(netdrv, length, message); From b92b7373d3b73e815fd3fd47bd962dba247f723a Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 6 Feb 2025 14:31:06 -0700 Subject: [PATCH 27/38] Fix forwarding on port absent messages. --- core/federated/RTI/rti_remote.c | 7 +++++-- core/federated/federate.c | 13 ++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 6eb8a09e8..3f3651a1c 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -167,7 +167,8 @@ void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_DNET, e->id, &tag); } - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || + write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -238,7 +239,9 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char } // Forward the message. - write_to_netdrv_fail_on_error(fed->fed_netdrv, message_size + 1, buffer, &rti_mutex, + write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, buffer, &rti_mutex, "RTI failed to forward message to federate %d.", + federate_id); + write_to_netdrv_fail_on_error(fed->fed_netdrv, message_size, buffer + 1, &rti_mutex, "RTI failed to forward message to federate %d.", federate_id); LF_MUTEX_UNLOCK(&rti_mutex); diff --git a/core/federated/federate.c b/core/federated/federate.c index 1f92ecf6e..83d57a9d7 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1547,13 +1547,12 @@ static void* listen_to_rti_netdrv(void* args) { case MSG_TYPE_STOP_GRANTED: handle_stop_granted_message(); break; - // TODO: Check. RTI does not send MSG_TYPE_PORT_ABSENT - // case MSG_TYPE_PORT_ABSENT: - // if (handle_port_absent_message(_fed.netdrv_to_RTI, -1)) { - // // Failures to complete the read of absent messages from the RTI are fatal. - // lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); - // } - // break; + case MSG_TYPE_PORT_ABSENT: + if (handle_port_absent_message(_fed.netdrv_to_RTI, -1)) { + // Failures to complete the read of absent messages from the RTI are fatal. + lf_print_error_and_exit("Failed to complete the reading of an absent message from the RTI."); + } + break; case MSG_TYPE_DOWNSTREAM_NEXT_EVENT_TAG: handle_downstream_next_event_tag(); break; From d66eb3b1ab742991ed6edfd557796335644f1643 Mon Sep 17 00:00:00 2001 From: erlingrj Date: Mon, 10 Feb 2025 17:52:06 +0100 Subject: [PATCH 28/38] Avoid a double join on the sensor_simulator output thread --- util/sensor_simulator.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/util/sensor_simulator.c b/util/sensor_simulator.c index 2ad8b8ef9..4d620d8bb 100644 --- a/util/sensor_simulator.c +++ b/util/sensor_simulator.c @@ -347,13 +347,16 @@ void end_sensor_simulator() { lf_register_print_function(NULL, -1); _lf_sensor_post_message(_lf_sensor_close_windows, NULL); - void* thread_return; - lf_thread_join(_lf_sensor.output_thread_id, &thread_return); + // Join thread, if it was created and it was not already joined. + if (_lf_sensor.thread_created > 0) { + void* thread_return; + lf_thread_join(_lf_sensor.output_thread_id, &thread_return); + _lf_sensor.thread_created = 0; + } // Timeout mode should result in the input thread exiting on its own. // pthread_kill(_lf_sensor.input_thread_id, SIGINT); - _lf_sensor.thread_created = 0; if (_lf_sensor.log_file != NULL) { fclose(_lf_sensor.log_file); } From ced94868b937ccddb00ff6ad35f12aa4c569fbad Mon Sep 17 00:00:00 2001 From: erlingrj Date: Mon, 10 Feb 2025 17:53:03 +0100 Subject: [PATCH 29/38] Fix memory bug when obtaining a port number --- core/federated/federate.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index f7f52e37a..5ec66cdca 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1978,10 +1978,13 @@ void lf_connect_to_rti(const char* hostname, int port) { void lf_create_server(int specified_port) { assert(specified_port <= UINT16_MAX && specified_port >= 0); - if (create_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, TCP, false)) { + uint16_t final_port; + if (create_server(specified_port, &_fed.server_socket, &final_port, TCP, false)) { lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); }; - LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port); + + LF_PRINT_LOG("Server for communicating with other federates started using port %u.", final_port); + _fed.server_port = final_port; // Send the server port number to the RTI // on an MSG_TYPE_ADDRESS_ADVERTISEMENT message (@see net_common.h). From 62bad679a58d0a450c679b3305601bd91317fd32 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Wed, 12 Feb 2025 15:42:02 -0700 Subject: [PATCH 30/38] Revert "Fix read/write to send header separately to match numbers. Fed-to-Fed not done yet." This reverts commit a3887e9b578ef3612877374d23abb9ce92bdc044. --- core/federated/RTI/rti_remote.c | 35 +++++++++++-------------------- core/federated/federate.c | 37 +++++++++++---------------------- 2 files changed, 24 insertions(+), 48 deletions(-) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 3f3651a1c..296774338 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -75,8 +75,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // This function is called in notify_advance_grant_if_safe(), which is a long // function. During this call, the network driver might close, causing the following write_to_netdrv // to fail. Consider a failure here a soft failure and update the federate's status. - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || - write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -109,8 +108,7 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) { // This function is called in notify_advance_grant_if_safe(), which is a long // function. During this call, the network driver might close, causing the following write_to_netdrv // to fail. Consider a failure here a soft failure and update the federate's status. - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || - write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) { lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -167,8 +165,7 @@ void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_DNET, e->id, &tag); } - if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) || - write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) { + if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) { lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id); e->state = NOT_CONNECTED; } else { @@ -239,17 +236,16 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char } // Forward the message. - write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, buffer, &rti_mutex, "RTI failed to forward message to federate %d.", - federate_id); - write_to_netdrv_fail_on_error(fed->fed_netdrv, message_size, buffer + 1, &rti_mutex, + write_to_netdrv_fail_on_error(fed->fed_netdrv, message_size + 1, buffer, &rti_mutex, "RTI failed to forward message to federate %d.", federate_id); LF_MUTEX_UNLOCK(&rti_mutex); } void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer) { - size_t header_size = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t); - read_from_netdrv_fail_on_error(sending_federate->fed_netdrv, header_size, &(buffer[1]), NULL, + size_t header_size = 1 + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t); + // Read the header, minus the first byte which has already been read. + read_from_netdrv_fail_on_error(sending_federate->fed_netdrv, header_size - 1, &(buffer[1]), NULL, "RTI failed to read the timed message header from remote federate."); // Extract the header information. of the sender uint16_t reactor_port_id; @@ -335,9 +331,8 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_TAGGED_MSG, federate_id, &intended_tag); } - write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, buffer, &rti_mutex, - "RTI failed to forward message header to federate %d.", federate_id); - write_to_netdrv_fail_on_error(fed->fed_netdrv, bytes_read - 1, buffer + 1, &rti_mutex, + + write_to_netdrv_fail_on_error(fed->fed_netdrv, bytes_read, buffer, &rti_mutex, "RTI failed to forward message to federate %d.", federate_id); // The message length may be longer than the buffer, @@ -463,10 +458,7 @@ static void broadcast_stop_time_to_federates_locked() { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_STOP_GRN, fed->enclave.id, &rti_remote->base.max_stop_tag); } - write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, outgoing_buffer, &rti_mutex, - "RTI failed to send MSG_TYPE_STOP_GRANTED message header to federate %d.", - fed->enclave.id); - write_to_netdrv_fail_on_error(fed->fed_netdrv, MSG_TYPE_STOP_GRANTED_LENGTH - 1, outgoing_buffer + 1, &rti_mutex, + write_to_netdrv_fail_on_error(fed->fed_netdrv, MSG_TYPE_STOP_GRANTED_LENGTH, outgoing_buffer, &rti_mutex, "RTI failed to send MSG_TYPE_STOP_GRANTED message to federate %d.", fed->enclave.id); } @@ -586,11 +578,8 @@ void handle_stop_request_message(federate_info_t* fed) { if (rti_remote->base.tracing_enabled) { tracepoint_rti_to_federate(send_STOP_REQ, f->enclave.id, &rti_remote->base.max_stop_tag); } - write_to_netdrv_fail_on_error(f->fed_netdrv, 1, stop_request_buffer, &rti_mutex, - "RTI failed to forward MSG_TYPE_STOP_REQUEST message header to federate %d.", - f->enclave.id); - write_to_netdrv_fail_on_error(f->fed_netdrv, MSG_TYPE_STOP_REQUEST_LENGTH - 1, stop_request_buffer + 1, - &rti_mutex, "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.", + write_to_netdrv_fail_on_error(f->fed_netdrv, MSG_TYPE_STOP_REQUEST_LENGTH, stop_request_buffer, &rti_mutex, + "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.", f->enclave.id); } } diff --git a/core/federated/federate.c b/core/federated/federate.c index 83d57a9d7..1f660db67 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -111,9 +111,7 @@ static void send_time(unsigned char type, instant_t time) { tracepoint_federate_to_rti(send_TIMESTAMP, _lf_my_fed_id, &tag); LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, - "Failed to send MSG_TYPE_TIMESTAMP header."); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write - 1, buffer + 1, &lf_outbound_netdrv_mutex, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write, buffer, &lf_outbound_netdrv_mutex, "Failed to send time " PRINTF_TIME " to the RTI.", time - start_time); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); } @@ -140,8 +138,7 @@ static void send_tag(unsigned char type, tag_t tag) { trace_event_t event_type = (type == MSG_TYPE_NEXT_EVENT_TAG) ? send_NET : send_LTC; // Trace the event when tracing is enabled tracepoint_federate_to_rti(event_type, _lf_my_fed_id, &tag); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, "Failed to send tag header."); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write - 1, buffer + 1, &lf_outbound_netdrv_mutex, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write, buffer, &lf_outbound_netdrv_mutex, "Failed to send tag " PRINTF_TAG " to the RTI.", tag.time - start_time, tag.microstep); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); } @@ -1420,9 +1417,7 @@ static void handle_stop_request_message() { // Send the current logical time to the RTI. LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, outgoing_buffer, &lf_outbound_netdrv_mutex, - "Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI."); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH - 1, outgoing_buffer + 1, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH, outgoing_buffer, &lf_outbound_netdrv_mutex, "Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI."); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); @@ -1715,9 +1710,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id) { tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL); LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, - "Failed to send address query header for federate %d to RTI.", remote_federate_id); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(uint16_t), buffer + 1, &lf_outbound_netdrv_mutex, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_netdrv_mutex, "Failed to send address query for federate %d to RTI.", remote_federate_id); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); @@ -1977,9 +1970,7 @@ void lf_create_server(int specified_port) { tracepoint_federate_to_rti(send_ADR_AD, _lf_my_fed_id, NULL); // No need for a mutex because we have the only handle on this network driver. - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, (unsigned char*)buffer, NULL, - "Failed to send address advertisement header."); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(int32_t), (unsigned char*)buffer + 1, NULL, + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(int32_t) + 1, (unsigned char*)buffer, NULL, "Failed to send address advertisement."); LF_PRINT_DEBUG("Sent port %d to the RTI.", server_port); @@ -2211,8 +2202,7 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa // Trace the event when tracing is enabled tracepoint_federate_to_federate(send_P2P_MSG, _lf_my_fed_id, federate, NULL); - int result = write_to_netdrv_close_on_error(netdrv, 1, header_buffer); - result = write_to_netdrv_close_on_error(netdrv, header_length - 1, header_buffer + 1); + int result = write_to_netdrv_close_on_error(netdrv, header_length, header_buffer); if (result == 0) { // Header sent successfully. Send the body. result = write_to_netdrv_close_on_error(netdrv, length, message); @@ -2420,8 +2410,7 @@ void lf_send_port_absent_to_federate(environment_t* env, interval_t additional_d } LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex); - int result = write_to_netdrv_close_on_error(netdrv, 1, buffer); - result = write_to_netdrv_close_on_error(netdrv, message_length - 1, buffer + 1); + int result = write_to_netdrv_close_on_error(netdrv, message_length, buffer); LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex); if (result != 0) { @@ -2458,11 +2447,9 @@ int lf_send_stop_request_to_rti(tag_t stop_tag) { } // Trace the event when tracing is enabled tracepoint_federate_to_rti(send_STOP_REQ, _lf_my_fed_id, &stop_tag); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, - "Failed to send stop request header."); - write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_LENGTH - 1, buffer + 1, - &lf_outbound_netdrv_mutex, "Failed to send stop time " PRINTF_TIME " to the RTI.", - stop_tag.time - start_time); + + write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_LENGTH, buffer, &lf_outbound_netdrv_mutex, + "Failed to send stop time " PRINTF_TIME " to the RTI.", stop_tag.time - start_time); // Treat this sending as equivalent to having received a stop request from the RTI. _fed.received_stop_request_from_rti = true; @@ -2536,8 +2523,8 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int if (lf_tag_compare(_fed.last_DNET, current_message_intended_tag) > 0) { _fed.last_DNET = current_message_intended_tag; } - int result = write_to_netdrv_close_on_error(netdrv, 1, header_buffer); - result = write_to_netdrv_close_on_error(netdrv, header_length - 1, header_buffer + 1); + + int result = write_to_netdrv_close_on_error(netdrv, header_length, header_buffer); if (result == 0) { // Header sent successfully. Send the body. result = write_to_netdrv_close_on_error(netdrv, length, message); From c8dbb993c9ef5586f5af070b438e7b0e854af628 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 27 Feb 2025 18:29:21 -0700 Subject: [PATCH 31/38] Add shutdown mutex. --- core/federated/RTI/rti_remote.c | 1 + core/federated/network/socket_common.c | 6 ++++++ include/core/federated/network/socket_common.h | 5 +++++ 3 files changed, 12 insertions(+) diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 91e2fe7dc..758a40c5b 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1563,6 +1563,7 @@ void initialize_RTI(rti_remote_t* rti) { // Initialize thread synchronization primitives LF_MUTEX_INIT(&rti_mutex); + LF_MUTEX_INIT(&shutdown_mutex); LF_COND_INIT(&received_start_times, &rti_mutex); LF_COND_INIT(&sent_start_time, &rti_mutex); diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index e2b20660f..ee1410272 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -23,6 +23,7 @@ // Mutex lock held while performing socket close operations. // A deadlock can occur if two threads simulataneously attempt to close the same socket. lf_mutex_t socket_mutex; +lf_mutex_t shutdown_mutex; int create_real_time_tcp_socket_errexit() { int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); @@ -398,8 +399,10 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char* } int shutdown_socket(int* socket, bool read_before_closing) { + LF_MUTEX_LOCK(&shutdown_mutex); if (*socket == -1) { lf_print_log("Socket is already closed."); + LF_MUTEX_UNLOCK(&shutdown_mutex); return 0; } if (!read_before_closing) { @@ -425,6 +428,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { while (read(*socket, buffer, 10) > 0) ; } + LF_MUTEX_UNLOCK(&shutdown_mutex); close_socket: // Label to jump to the closing part of the function // NOTE: In all common TCP/IP stacks, there is a time period, @@ -434,8 +438,10 @@ int shutdown_socket(int* socket, bool read_before_closing) { // duplicated packets intended for this program. if (close(*socket)) { lf_print_log("Error while closing socket: %s\n", strerror(errno)); + LF_MUTEX_UNLOCK(&shutdown_mutex); return -1; } *socket = -1; + LF_MUTEX_UNLOCK(&shutdown_mutex); return 0; } diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h index 87bac7511..ba216a057 100644 --- a/include/core/federated/network/socket_common.h +++ b/include/core/federated/network/socket_common.h @@ -73,6 +73,11 @@ typedef enum socket_type_t { TCP, UDP } socket_type_t; */ extern lf_mutex_t socket_mutex; +/** + * Mutex protecting socket shutdown operations. + */ +extern lf_mutex_t shutdown_mutex; + /** * @brief Create an IPv4 TCP socket with Nagle's algorithm disabled * (TCP_NODELAY) and Delayed ACKs disabled (TCP_QUICKACK). Exits application From 4bf70ac08bb1976c89cd9f4e2883edd6f3294e21 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Thu, 27 Feb 2025 18:30:05 -0700 Subject: [PATCH 32/38] Add ref. --- lingua-franca-ref.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 8b25206ff..316fcb7f9 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master \ No newline at end of file +shutdown \ No newline at end of file From f6e86745d64f4bf78674efa0b3b6bd23e30e861a Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 28 Feb 2025 12:43:09 -0700 Subject: [PATCH 33/38] Minor fix. --- network/impl/src/lf_sst_support.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 982404fc0..6d469ff43 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -116,7 +116,7 @@ int connect_to_netdrv(netdrv_t drv) { return 0; } -// TODO: +// TODO: Still need to fix... int read_from_netdrv(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { sst_priv_t* priv = get_sst_priv_t(drv); return read_from_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); From b2846c253620c028d7ad1dbaebdbf9bddc53797b Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 28 Feb 2025 12:44:21 -0700 Subject: [PATCH 34/38] Minor fix on formatting. --- network/impl/src/lf_sst_support.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 6d469ff43..0180e000a 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -1,5 +1,5 @@ -#include // malloc() -#include // strncpy() +#include // malloc() +#include // strncpy() #include "net_driver.h" #include "lf_sst_support.h" From 01a4b55b8316a1527b7330ce6debeb7b32bd2b6d Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Fri, 28 Feb 2025 14:45:21 -0700 Subject: [PATCH 35/38] Fix names to chan. --- network/impl/src/lf_sst_support.c | 114 +++++++++++++++--------------- 1 file changed, 57 insertions(+), 57 deletions(-) diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c index 0180e000a..5c49493f2 100644 --- a/network/impl/src/lf_sst_support.c +++ b/network/impl/src/lf_sst_support.c @@ -7,15 +7,15 @@ const char* sst_config_path; // The SST's configuration file path. -static sst_priv_t* get_sst_priv_t(netdrv_t drv) { - if (drv == NULL) { +static sst_priv_t* get_sst_priv_t(netchan_t chan) { + if (chan == NULL) { lf_print_error("Network driver is already closed."); return NULL; } - return (sst_priv_t*)drv; + return (sst_priv_t*)chan; } -netdrv_t initialize_netdrv() { +netchan_t initialize_netchan() { // Initialize sst_priv. sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t)); if (sst_priv == NULL) { @@ -43,40 +43,40 @@ netdrv_t initialize_netdrv() { sst_priv->sst_ctx = NULL; sst_priv->session_ctx = NULL; - return (netdrv_t)sst_priv; + return (netchan_t)sst_priv; } -void free_netdrv(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +void free_netchan(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); free(priv->socket_priv); free(priv); } -int create_server(netdrv_t drv, bool increment_port_on_retry) { - sst_priv_t* priv = get_sst_priv_t(drv); +int create_server(netchan_t chan, bool increment_port_on_retry) { + sst_priv_t* priv = get_sst_priv_t(chan); SST_ctx_t* ctx = init_SST(sst_config_path); priv->sst_ctx = ctx; return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, &priv->socket_priv->port, TCP, increment_port_on_retry); } -netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { - sst_priv_t* serv_priv = get_sst_priv_t(server_drv); +netchan_t accept_netchan(netchan_t server_chan, netchan_t rti_chan) { + sst_priv_t* serv_priv = get_sst_priv_t(server_chan); int rti_socket; - if (rti_drv == NULL) { - // Set to -1, to indicate that this accept_netdrv() call is not trying to check if the rti_drv is available, inside + if (rti_chan == NULL) { + // Set to -1, to indicate that this accept_netchan() call is not trying to check if the rti_chan is available, inside // the accept_socket() function. rti_socket = -1; } else { - sst_priv_t* rti_priv = get_sst_priv_t(rti_drv); + sst_priv_t* rti_priv = get_sst_priv_t(rti_chan); rti_socket = rti_priv->socket_priv->socket_descriptor; } - netdrv_t fed_netdrv = initialize_netdrv(); - sst_priv_t* fed_priv = get_sst_priv_t(fed_netdrv); + netchan_t fed_netchan = initialize_netchan(); + sst_priv_t* fed_priv = get_sst_priv_t(fed_netchan); int sock = accept_socket(serv_priv->socket_priv->socket_descriptor, rti_socket); if (sock == -1) { - free_netdrv(fed_netdrv); + free_netchan(fed_netchan); return NULL; } fed_priv->socket_priv->socket_descriptor = sock; @@ -85,25 +85,25 @@ netdrv_t accept_netdrv(netdrv_t server_drv, netdrv_t rti_drv) { lf_print_error("RTI failed to get peer address."); }; - // TODO: Do we need to copy sst_ctx form server_drv to fed_drv? + // TODO: Do we need to copy sst_ctx form server_chan to fed_chan? session_key_list_t* s_key_list = init_empty_session_key_list(); SST_session_ctx_t* session_ctx = server_secure_comm_setup(serv_priv->sst_ctx, fed_priv->socket_priv->socket_descriptor, s_key_list); // Session key used is copied to the session_ctx. free_session_key_list_t(s_key_list); fed_priv->session_ctx = session_ctx; - return fed_netdrv; + return fed_netchan; } -void create_client(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +void create_client(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit(); SST_ctx_t* ctx = init_SST(sst_config_path); priv->sst_ctx = ctx; } -int connect_to_netdrv(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +int connect_to_netchan(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); int ret = connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname, priv->socket_priv->server_port); if (ret != 0) { @@ -117,14 +117,14 @@ int connect_to_netdrv(netdrv_t drv) { } // TODO: Still need to fix... -int read_from_netdrv(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { - sst_priv_t* priv = get_sst_priv_t(drv); +int read_from_netchan(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); return read_from_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); } -int read_from_netdrv_close_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { - sst_priv_t* priv = get_sst_priv_t(drv); - int read_failed = read_from_netdrv(drv, num_bytes, buffer); +int read_from_netchan_close_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); + int read_failed = read_from_netchan(chan, num_bytes, buffer); if (read_failed) { // Read failed. // Socket has probably been closed from the other side. @@ -135,10 +135,10 @@ int read_from_netdrv_close_on_error(netdrv_t drv, size_t num_bytes, unsigned cha return 0; } -void read_from_netdrv_fail_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, +void read_from_netchan_fail_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, char* format, ...) { va_list args; - int read_failed = read_from_netdrv_close_on_error(drv, num_bytes, buffer); + int read_failed = read_from_netchan_close_on_error(chan, num_bytes, buffer); if (read_failed) { // Read failed. if (mutex != NULL) { @@ -154,14 +154,14 @@ void read_from_netdrv_fail_on_error(netdrv_t drv, size_t num_bytes, unsigned cha } } -int write_to_netdrv(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { - sst_priv_t* priv = get_sst_priv_t(drv); +int write_to_netchan(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); return write_to_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); } -int write_to_netdrv_close_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer) { - sst_priv_t* priv = get_sst_priv_t(drv); - int result = write_to_netdrv(drv, num_bytes, buffer); +int write_to_netchan_close_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); + int result = write_to_netchan(chan, num_bytes, buffer); if (result) { // Write failed. // Socket has probably been closed from the other side. @@ -171,10 +171,10 @@ int write_to_netdrv_close_on_error(netdrv_t drv, size_t num_bytes, unsigned char return result; } -void write_to_netdrv_fail_on_error(netdrv_t drv, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, +void write_to_netchan_fail_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, char* format, ...) { va_list args; - int result = write_to_netdrv_close_on_error(drv, num_bytes, buffer); + int result = write_to_netchan_close_on_error(chan, num_bytes, buffer); if (result) { // Write failed. if (mutex != NULL) { @@ -190,59 +190,59 @@ void write_to_netdrv_fail_on_error(netdrv_t drv, size_t num_bytes, unsigned char } } -bool check_netdrv_closed(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +bool check_netchan_closed(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); return check_socket_closed(priv->socket_priv->socket_descriptor); } -int shutdown_netdrv(netdrv_t drv, bool read_before_closing) { - if (drv == NULL) { +int shutdown_netchan(netchan_t chan, bool read_before_closing) { + if (chan == NULL) { lf_print("Socket already closed."); return 0; } - sst_priv_t* priv = get_sst_priv_t(drv); + sst_priv_t* priv = get_sst_priv_t(chan); int ret = shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing); if (ret != 0) { lf_print_error("Failed to shutdown socket."); } - free_netdrv(drv); + free_netchan(chan); return ret; } // END of TODO: // Get/set functions. -int32_t get_my_port(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +int32_t get_my_port(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); return priv->socket_priv->port; } -int32_t get_server_port(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +int32_t get_server_port(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); return priv->socket_priv->server_port; } -struct in_addr* get_ip_addr(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +struct in_addr* get_ip_addr(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); return &priv->socket_priv->server_ip_addr; } -char* get_server_hostname(netdrv_t drv) { - sst_priv_t* priv = get_sst_priv_t(drv); +char* get_server_hostname(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); return priv->socket_priv->server_hostname; } -void set_my_port(netdrv_t drv, int32_t port) { - sst_priv_t* priv = get_sst_priv_t(drv); +void set_my_port(netchan_t chan, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(chan); priv->socket_priv->port = port; } -void set_server_port(netdrv_t drv, int32_t port) { - sst_priv_t* priv = get_sst_priv_t(drv); +void set_server_port(netchan_t chan, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(chan); priv->socket_priv->server_port = port; } -void set_server_hostname(netdrv_t drv, const char* hostname) { - sst_priv_t* priv = get_sst_priv_t(drv); +void set_server_hostname(netchan_t chan, const char* hostname) { + sst_priv_t* priv = get_sst_priv_t(chan); memcpy(priv->socket_priv->server_hostname, hostname, INET_ADDRSTRLEN); } From dbbc22cf520fb62ae67a3403b07a5e866cf527b5 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Mon, 3 Mar 2025 13:46:40 -0700 Subject: [PATCH 36/38] Fix socket_mutex to inbound socket_mutex. --- core/federated/federate.c | 9 +++++---- core/federated/network/socket_common.c | 1 - include/core/federated/federate.h | 7 ++++++- include/core/federated/network/socket_common.h | 5 ----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 7eafbcd01..8a09a9e8a 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -50,6 +50,7 @@ extern bool _lf_termination_executed; // Global variables references in federate.h lf_mutex_t lf_outbound_socket_mutex; +lf_mutex_t lf_inbound_socket_mutex; lf_cond_t lf_port_status_changed; /** @@ -414,11 +415,11 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen * federate. */ static void close_inbound_socket(int fed_id) { - LF_MUTEX_LOCK(&socket_mutex); + LF_MUTEX_LOCK(&lf_inbound_socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) { shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false); } - LF_MUTEX_UNLOCK(&socket_mutex); + LF_MUTEX_UNLOCK(&lf_inbound_socket_mutex); } /** @@ -2090,12 +2091,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { int result = lf_thread_create(&_fed.inbound_socket_listeners[received_federates], listen_to_federates, fed_id_arg); if (result != 0) { // Failed to create a listening thread. - LF_MUTEX_LOCK(&socket_mutex); + LF_MUTEX_LOCK(&lf_inbound_socket_mutex); if (_fed.sockets_for_inbound_p2p_connections[remote_fed_id] != -1) { shutdown_socket(&socket_id, false); _fed.sockets_for_inbound_p2p_connections[remote_fed_id] = -1; } - LF_MUTEX_UNLOCK(&socket_mutex); + LF_MUTEX_UNLOCK(&lf_inbound_socket_mutex); lf_print_error_and_exit("Failed to create a thread to listen for incoming physical connection. Error code: %d.", result); } diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index ee1410272..ddce742c6 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -22,7 +22,6 @@ // Mutex lock held while performing socket close operations. // A deadlock can occur if two threads simulataneously attempt to close the same socket. -lf_mutex_t socket_mutex; lf_mutex_t shutdown_mutex; int create_real_time_tcp_socket_errexit() { diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index e7697f259..7ee0dcc8d 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -221,10 +221,15 @@ typedef enum parse_rti_code_t { SUCCESS, INVALID_PORT, INVALID_HOST, INVALID_USE // Global variables /** - * Mutex lock held while performing socket write and close operations. + * Mutex lock held while performing outbound socket write and close operations. */ extern lf_mutex_t lf_outbound_socket_mutex; +/** + * Mutex lock held while performing inbound socket write and close operations. + */ +extern lf_mutex_t lf_inbound_socket_mutex; + /** * Condition variable for blocking on unkonwn federate input ports. */ diff --git a/include/core/federated/network/socket_common.h b/include/core/federated/network/socket_common.h index ba216a057..c443d56a0 100644 --- a/include/core/federated/network/socket_common.h +++ b/include/core/federated/network/socket_common.h @@ -68,11 +68,6 @@ typedef enum socket_type_t { TCP, UDP } socket_type_t; -/** - * Mutex protecting socket close operations. - */ -extern lf_mutex_t socket_mutex; - /** * Mutex protecting socket shutdown operations. */ From bf79fe9be1b44839c14c2574ffddc3d727114255 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Mon, 3 Mar 2025 13:56:23 -0700 Subject: [PATCH 37/38] Add return before goto. --- core/federated/network/socket_common.c | 1 + 1 file changed, 1 insertion(+) diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index ddce742c6..06a58ec2c 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -428,6 +428,7 @@ int shutdown_socket(int* socket, bool read_before_closing) { ; } LF_MUTEX_UNLOCK(&shutdown_mutex); + return 0; close_socket: // Label to jump to the closing part of the function // NOTE: In all common TCP/IP stacks, there is a time period, From 618989d6f92e67916237a79b2ade5eff23023ad1 Mon Sep 17 00:00:00 2001 From: Dongha Kim Date: Mon, 3 Mar 2025 20:07:27 -0700 Subject: [PATCH 38/38] Minor fix on descriptions. --- core/federated/federate.c | 2 ++ core/federated/network/socket_common.c | 3 +-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/federated/federate.c b/core/federated/federate.c index 8a09a9e8a..6a225a670 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -50,7 +50,9 @@ extern bool _lf_termination_executed; // Global variables references in federate.h lf_mutex_t lf_outbound_socket_mutex; + lf_mutex_t lf_inbound_socket_mutex; + lf_cond_t lf_port_status_changed; /** diff --git a/core/federated/network/socket_common.c b/core/federated/network/socket_common.c index 06a58ec2c..b1395e7b3 100644 --- a/core/federated/network/socket_common.c +++ b/core/federated/network/socket_common.c @@ -20,8 +20,7 @@ /** Number of nanoseconds to sleep before retrying a socket read. */ #define SOCKET_READ_RETRY_INTERVAL 1000000 -// Mutex lock held while performing socket close operations. -// A deadlock can occur if two threads simulataneously attempt to close the same socket. +// Mutex lock held while performing socket shutdown and close operations. lf_mutex_t shutdown_mutex; int create_real_time_tcp_socket_errexit() {