diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index 8c2869dc4..b57b887f6 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -137,6 +137,7 @@ void usage(int argc, const char* argv[]) { lf_print(" -a, --auth Turn on HMAC authentication options.\n"); lf_print(" -t, --tracing Turn on tracing.\n"); lf_print(" -d, --disable_dnet Turn off the use of DNET signals.\n"); + lf_print(" -sst, --sst SST config path for RTI.\n"); lf_print("Command given:"); for (int i = 0; i < argc; i++) { @@ -234,7 +235,7 @@ int process_args(int argc, const char* argv[]) { rti.base.number_of_scheduling_nodes = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print("RTI: Number of federates: %d", rti.base.number_of_scheduling_nodes); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { -#ifdef COMM_TYPE_TCP +#if defined(COMM_TYPE_TCP) || defined(COMM_TYPE_SST) if (argc < i + 2) { lf_print_error("--port needs a short unsigned integer argument ( > 0 and < %d).", UINT16_MAX); usage(argc, argv); @@ -266,6 +267,15 @@ int process_args(int argc, const char* argv[]) { return 0; #endif rti.authentication_enabled = true; + } else if (strcmp(argv[i], "-sst") == 0 || strcmp(argv[i], "--sst") == 0) { +#ifndef COMM_TYPE_SST + lf_print_error("--sst requires the RTI to be built with the --DCOMM_TYPE=SST option."); + usage(argc, argv); + return 0; +#else + i++; + lf_set_sst_config_path(argv[i]); +#endif } else if (strcmp(argv[i], "-t") == 0 || strcmp(argv[i], "--tracing") == 0) { rti.base.tracing_enabled = true; } else if (strcmp(argv[i], "-d") == 0 || strcmp(argv[i], "--dnet_disabled") == 0) { diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index bbf5ec5e1..fe44a20da 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -1572,6 +1572,7 @@ void initialize_RTI(rti_remote_t* rti) { // Initialize thread synchronization primitives LF_MUTEX_INIT(&rti_mutex); + LF_MUTEX_INIT(&shutdown_mutex); LF_COND_INIT(&received_start_times, &rti_mutex); LF_COND_INIT(&sent_start_time, &rti_mutex); diff --git a/core/federated/federate.c b/core/federated/federate.c index a9bb6d94e..d19301906 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -46,6 +46,9 @@ extern bool _lf_termination_executed; // Global variables references in federate.h lf_mutex_t lf_outbound_netchan_mutex; + +lf_mutex_t lf_inbound_netchan_mutex; + lf_cond_t lf_port_status_changed; /** @@ -400,12 +403,11 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen * federate. */ static void close_inbound_netchan(int fed_id) { - LF_MUTEX_LOCK(&netchan_mutex); - if (_fed.netchans_for_inbound_p2p_connections[fed_id] != NULL) { - shutdown_netchan(_fed.netchans_for_inbound_p2p_connections[fed_id], false); - _fed.netchans_for_inbound_p2p_connections[fed_id] = NULL; + LF_MUTEX_LOCK(&lf_inbound_netchan_mutex); + if (_fed.netchans_for_inbound_p2p_connections[fed_id] >= 0) { + shutdown_netchan(&_fed.netchans_for_inbound_p2p_connections[fed_id], false); } - LF_MUTEX_UNLOCK(&netchan_mutex); + LF_MUTEX_UNLOCK(&lf_inbound_netchan_mutex); } /** @@ -722,7 +724,6 @@ static int handle_port_absent_message(netchan_t netchan, int fed_id) { * network channel in _fed.netchans_for_inbound_p2p_connections * to -1 and returns, terminating the thread. * @param _args The remote federate ID (cast to void*). - * @param fed_id_ptr A pointer to a uint16_t containing federate ID being listened to. * This procedure frees the memory pointed to before returning. */ static void* listen_to_federates(void* _args) { @@ -1948,18 +1949,10 @@ void lf_connect_to_rti(const char* hostname, int port) { void lf_create_server(int specified_port) { assert(specified_port <= UINT16_MAX && specified_port >= 0); - - netchan_t server_netchan = initialize_netchan(); - set_my_port(server_netchan, specified_port); - - if (create_server(server_netchan, false)) { - lf_print_error_system_failure("RTI failed to create server: %s.", strerror(errno)); + if (create_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, TCP, false)) { + lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno)); }; - _fed.server_netchan = server_netchan; - // Get the final server port to send to the RTI on an MSG_TYPE_ADDRESS_ADVERTISEMENT message. - int32_t server_port = get_my_port(server_netchan); - - LF_PRINT_LOG("Server for communicating with other federates started using port %d.", server_port); + LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port); // Send the server port number to the RTI // on an MSG_TYPE_ADDRESS_ADVERTISEMENT message (@see net_common.h). @@ -2086,12 +2079,12 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) { int result = lf_thread_create(&_fed.inbound_netchan_listeners[received_federates], listen_to_federates, fed_id_arg); if (result != 0) { // Failed to create a listening thread. - LF_MUTEX_LOCK(&netchan_mutex); + LF_MUTEX_LOCK(&lf_inbound_netchan_mutex); if (_fed.netchans_for_inbound_p2p_connections[remote_fed_id] != NULL) { shutdown_netchan(_fed.netchans_for_inbound_p2p_connections[remote_fed_id], false); _fed.netchans_for_inbound_p2p_connections[remote_fed_id] = NULL; } - LF_MUTEX_UNLOCK(&netchan_mutex); + LF_MUTEX_UNLOCK(&lf_inbound_netchan_mutex); lf_print_error_and_exit("Failed to create a thread to listen for incoming physical connection. Error code: %d.", result); } diff --git a/core/reactor_common.c b/core/reactor_common.c index 7257e5132..90b972029 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -919,6 +919,9 @@ void usage(int argc, const char* argv[]) { printf(" The address of the RTI, which can be in the form of user@host:port or ip:port.\n\n"); printf(" -l\n"); printf(" Send stdout to individual log files for each federate.\n\n"); +#ifdef COMM_TYPE_SST + printf(" -sst, --sst \n"); +#endif #endif printf("Command given:\n"); @@ -1068,6 +1071,17 @@ int process_args(int argc, const char* argv[]) { return 0; } } +#endif +#ifdef COMM_TYPE_SST + else if (strcmp(arg, "-sst") == 0 || strcmp(arg, "--sst") == 0) { + if (argc < i + 1) { + lf_print_error("--sst needs a string argument."); + usage(argc, argv); + return 0; + } + const char* fid = argv[i++]; + lf_set_sst_config_path(fid); + } #endif else if (strcmp(arg, "--ros-args") == 0) { // FIXME: Ignore ROS arguments for now diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 493bd5a3e..7872bf9db 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -199,9 +199,8 @@ bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); - if (wait_duration < MIN_SLEEP_DURATION) { - LF_PRINT_DEBUG("Wait time " PRINTF_TIME " is less than MIN_SLEEP_DURATION " PRINTF_TIME ". Skipping wait.", - wait_duration, MIN_SLEEP_DURATION); + if (wait_duration < 0) { + LF_PRINT_DEBUG("We have already passed " PRINTF_TIME ". Skipping wait.", wait_until_time); return true; } diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 90cda32a8..5db34387f 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -215,10 +215,15 @@ typedef enum parse_rti_code_t { SUCCESS, INVALID_PORT, INVALID_HOST, INVALID_USE // Global variables /** - * Mutex lock held while performing network channel write and close operations. + * Mutex lock held while performing outbound network channel write and close operations. */ extern lf_mutex_t lf_outbound_netchan_mutex; +/** + * Mutex lock held while performing inbound network channel write and close operations. + */ +extern lf_mutex_t lf_inbound_netchan_mutex; + /** * Condition variable for blocking on unkonwn federate input ports. */ diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 2f5463165..727b3839e 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -93,8 +93,8 @@ void lf_synchronize_with_other_federates(void); * if that event time matches or exceeds the specified time. * * The mutex lock associated with the condition argument is assumed to be held by - * the calling thread. This mutex is released while waiting. If the wait time is - * too small to actually wait (less than MIN_SLEEP_DURATION), then this function + * the calling thread. This mutex is released while waiting. If the current physical + * time has already passed the specified time, then this function * immediately returns true and the mutex is not released. * * @param env Environment within which we are executing. diff --git a/logging/api/logging_macros.h b/logging/api/logging_macros.h index 3e22950b5..0fbabd157 100644 --- a/logging/api/logging_macros.h +++ b/logging/api/logging_macros.h @@ -107,4 +107,4 @@ static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG; } \ } while (0) #endif // NDEBUG -#endif // LOGGING_MACROS_H \ No newline at end of file +#endif // LOGGING_MACROS_H diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index afffd2a9e..3fe4d42d3 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -113,12 +113,12 @@ int lf_mutex_lock(lf_mutex_t* mutex); /** * @brief Get the number of cores on the host machine. */ -int lf_available_cores(); +int lf_available_cores(void); /** * @brief Return the lf_thread_t of the calling thread. */ -lf_thread_t lf_thread_self(); +lf_thread_t lf_thread_self(void); /** * Create a new thread, starting with execution of lf_thread @@ -273,12 +273,12 @@ int _lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time); * @brief The ID of the current thread. The only guarantee is that these IDs will be a contiguous range of numbers * starting at 0. */ -int lf_thread_id(); +int lf_thread_id(void); /** * @brief Initialize the thread ID for the current thread. */ -void initialize_lf_thread_id(); +void initialize_lf_thread_id(void); #endif // !defined(LF_SINGLE_THREADED) /** diff --git a/network/api/lf_sst_support.h b/network/api/lf_sst_support.h new file mode 100644 index 000000000..7fdb941ab --- /dev/null +++ b/network/api/lf_sst_support.h @@ -0,0 +1,15 @@ +#ifndef LF_SST_SUPPORT_H +#define LF_SST_SUPPORT_H + +#include "socket_common.h" +#include + +typedef struct sst_priv_t { + socket_priv_t* socket_priv; + SST_ctx_t* sst_ctx; + SST_session_ctx_t* session_ctx; +} sst_priv_t; + +void lf_set_sst_config_path(const char* config_path); + +#endif /* LF_SST_SUPPORT_H */ diff --git a/network/api/net_driver.h b/network/api/net_driver.h index 07f37755d..d21683e59 100644 --- a/network/api/net_driver.h +++ b/network/api/net_driver.h @@ -3,13 +3,17 @@ #include "socket_common.h" +#if defined(COMM_TYPE_SST) +#include "lf_sst_support.h" +#endif + typedef void* netchan_t; /** * Allocate memory for the network channel. * @return netchan_t Initialized network channel. */ -netchan_t initialize_netchan(); +netchan_t initialize_netchan(void); /** * Create a netchannel server. This is such as a server socket which accepts connections. diff --git a/network/api/socket_common.h b/network/api/socket_common.h index 0a8f18160..c14cc010e 100644 --- a/network/api/socket_common.h +++ b/network/api/socket_common.h @@ -77,9 +77,9 @@ typedef enum socket_type_t { TCP, UDP } socket_type_t; /** - * Mutex protecting socket close operations. + * Mutex protecting socket shutdown operations. */ -extern lf_mutex_t netchan_mutex; +extern lf_mutex_t shutdown_mutex; typedef struct socket_priv_t { int socket_descriptor; diff --git a/network/impl/CMakeLists.txt b/network/impl/CMakeLists.txt index 225edf3d5..e078e9e86 100644 --- a/network/impl/CMakeLists.txt +++ b/network/impl/CMakeLists.txt @@ -10,6 +10,10 @@ target_sources(lf-network-impl PUBLIC if(COMM_TYPE MATCHES TCP) target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_socket_support.c) +elseif(COMM_TYPE MATCHES SST) + find_package(sst-lib REQUIRED) + target_sources(lf-network-impl PUBLIC ${CMAKE_CURRENT_LIST_DIR}/src/lf_sst_support.c) + target_link_libraries(lf-network-impl PUBLIC sst-lib::sst-c-api) else() message(FATAL_ERROR "Your communication type is not supported! The C target supports TCP.") endif() diff --git a/network/impl/src/lf_sst_support.c b/network/impl/src/lf_sst_support.c new file mode 100644 index 000000000..5c49493f2 --- /dev/null +++ b/network/impl/src/lf_sst_support.c @@ -0,0 +1,250 @@ +#include // malloc() +#include // strncpy() + +#include "net_driver.h" +#include "lf_sst_support.h" +#include "util.h" + +const char* sst_config_path; // The SST's configuration file path. + +static sst_priv_t* get_sst_priv_t(netchan_t chan) { + if (chan == NULL) { + lf_print_error("Network driver is already closed."); + return NULL; + } + return (sst_priv_t*)chan; +} + +netchan_t initialize_netchan() { + // Initialize sst_priv. + sst_priv_t* sst_priv = malloc(sizeof(sst_priv_t)); + if (sst_priv == NULL) { + lf_print_error_and_exit("Falied to malloc sst_priv_t."); + } + // Initialize socket_priv. + socket_priv_t* socket_priv = malloc(sizeof(socket_priv_t)); + if (socket_priv == NULL) { + lf_print_error_and_exit("Falied to malloc socket_priv_t."); + } + + // Server initialization. + socket_priv->port = 0; + socket_priv->user_specified_port = 0; + socket_priv->socket_descriptor = -1; + + // Federate initialization + strncpy(socket_priv->server_hostname, "localhost", INET_ADDRSTRLEN); + socket_priv->server_ip_addr.s_addr = 0; + socket_priv->server_port = -1; + + sst_priv->socket_priv = socket_priv; + + // SST initialization. Only set pointers to NULL. + sst_priv->sst_ctx = NULL; + sst_priv->session_ctx = NULL; + + return (netchan_t)sst_priv; +} + +void free_netchan(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + free(priv->socket_priv); + free(priv); +} + +int create_server(netchan_t chan, bool increment_port_on_retry) { + sst_priv_t* priv = get_sst_priv_t(chan); + SST_ctx_t* ctx = init_SST(sst_config_path); + priv->sst_ctx = ctx; + return create_socket_server(priv->socket_priv->user_specified_port, &priv->socket_priv->socket_descriptor, + &priv->socket_priv->port, TCP, increment_port_on_retry); +} + +netchan_t accept_netchan(netchan_t server_chan, netchan_t rti_chan) { + sst_priv_t* serv_priv = get_sst_priv_t(server_chan); + int rti_socket; + if (rti_chan == NULL) { + // Set to -1, to indicate that this accept_netchan() call is not trying to check if the rti_chan is available, inside + // the accept_socket() function. + rti_socket = -1; + } else { + sst_priv_t* rti_priv = get_sst_priv_t(rti_chan); + rti_socket = rti_priv->socket_priv->socket_descriptor; + } + netchan_t fed_netchan = initialize_netchan(); + sst_priv_t* fed_priv = get_sst_priv_t(fed_netchan); + + int sock = accept_socket(serv_priv->socket_priv->socket_descriptor, rti_socket); + if (sock == -1) { + free_netchan(fed_netchan); + return NULL; + } + fed_priv->socket_priv->socket_descriptor = sock; + // Get the peer address from the connected socket_id. Saving this for the address query. + if (get_peer_address(fed_priv->socket_priv) != 0) { + lf_print_error("RTI failed to get peer address."); + }; + + // TODO: Do we need to copy sst_ctx form server_chan to fed_chan? + session_key_list_t* s_key_list = init_empty_session_key_list(); + SST_session_ctx_t* session_ctx = + server_secure_comm_setup(serv_priv->sst_ctx, fed_priv->socket_priv->socket_descriptor, s_key_list); + // Session key used is copied to the session_ctx. + free_session_key_list_t(s_key_list); + fed_priv->session_ctx = session_ctx; + return fed_netchan; +} + +void create_client(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + priv->socket_priv->socket_descriptor = create_real_time_tcp_socket_errexit(); + SST_ctx_t* ctx = init_SST(sst_config_path); + priv->sst_ctx = ctx; +} + +int connect_to_netchan(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + int ret = connect_to_socket(priv->socket_priv->socket_descriptor, priv->socket_priv->server_hostname, + priv->socket_priv->server_port); + if (ret != 0) { + return ret; + } + session_key_list_t* s_key_list = get_session_key(priv->sst_ctx, NULL); + SST_session_ctx_t* session_ctx = + secure_connect_to_server_with_socket(&s_key_list->s_key[0], priv->socket_priv->socket_descriptor); + priv->session_ctx = session_ctx; + return 0; +} + +// TODO: Still need to fix... +int read_from_netchan(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); + return read_from_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); +} + +int read_from_netchan_close_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); + int read_failed = read_from_netchan(chan, num_bytes, buffer); + if (read_failed) { + // Read failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + return -1; + } + return 0; +} + +void read_from_netchan_fail_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + int read_failed = read_from_netchan_close_on_error(chan, num_bytes, buffer); + if (read_failed) { + // Read failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error_system_failure("Failed to read from socket."); + } + } +} + +int write_to_netchan(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); + return write_to_socket(priv->socket_priv->socket_descriptor, num_bytes, buffer); +} + +int write_to_netchan_close_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer) { + sst_priv_t* priv = get_sst_priv_t(chan); + int result = write_to_netchan(chan, num_bytes, buffer); + if (result) { + // Write failed. + // Socket has probably been closed from the other side. + // Shut down and close the socket from this side. + shutdown_socket(&priv->socket_priv->socket_descriptor, false); + } + return result; +} + +void write_to_netchan_fail_on_error(netchan_t chan, size_t num_bytes, unsigned char* buffer, lf_mutex_t* mutex, + char* format, ...) { + va_list args; + int result = write_to_netchan_close_on_error(chan, num_bytes, buffer); + if (result) { + // Write failed. + if (mutex != NULL) { + LF_MUTEX_UNLOCK(mutex); + } + if (format != NULL) { + va_start(args, format); + lf_print_error_system_failure(format, args); + va_end(args); + } else { + lf_print_error("Failed to write to socket. Closing it."); + } + } +} + +bool check_netchan_closed(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + return check_socket_closed(priv->socket_priv->socket_descriptor); +} + +int shutdown_netchan(netchan_t chan, bool read_before_closing) { + if (chan == NULL) { + lf_print("Socket already closed."); + return 0; + } + sst_priv_t* priv = get_sst_priv_t(chan); + int ret = shutdown_socket(&priv->socket_priv->socket_descriptor, read_before_closing); + if (ret != 0) { + lf_print_error("Failed to shutdown socket."); + } + free_netchan(chan); + return ret; +} +// END of TODO: + +// Get/set functions. +int32_t get_my_port(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + return priv->socket_priv->port; +} + +int32_t get_server_port(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + return priv->socket_priv->server_port; +} + +struct in_addr* get_ip_addr(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + return &priv->socket_priv->server_ip_addr; +} + +char* get_server_hostname(netchan_t chan) { + sst_priv_t* priv = get_sst_priv_t(chan); + return priv->socket_priv->server_hostname; +} + +void set_my_port(netchan_t chan, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(chan); + priv->socket_priv->port = port; +} + +void set_server_port(netchan_t chan, int32_t port) { + sst_priv_t* priv = get_sst_priv_t(chan); + priv->socket_priv->server_port = port; +} + +void set_server_hostname(netchan_t chan, const char* hostname) { + sst_priv_t* priv = get_sst_priv_t(chan); + memcpy(priv->socket_priv->server_hostname, hostname, INET_ADDRSTRLEN); +} + +// Helper function. +void lf_set_sst_config_path(const char* config_path) { sst_config_path = config_path; } diff --git a/network/impl/src/socket_common.c b/network/impl/src/socket_common.c index f5e1089ef..6dd0fd7a5 100644 --- a/network/impl/src/socket_common.c +++ b/network/impl/src/socket_common.c @@ -21,9 +21,8 @@ /** Number of nanoseconds to sleep before retrying a socket read. */ #define SOCKET_READ_RETRY_INTERVAL 1000000 -// Mutex lock held while performing network channel close operations. -// A deadlock can occur if two threads simulataneously attempt to close the same network channel. -lf_mutex_t netchan_mutex; +// Mutex lock held while performing socket shutdown and close operations. +lf_mutex_t shutdown_mutex; int create_real_time_tcp_socket_errexit(void) { int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); @@ -355,8 +354,10 @@ int write_to_socket(int socket, size_t num_bytes, unsigned char* buffer) { } int shutdown_socket(int* socket, bool read_before_closing) { + LF_MUTEX_LOCK(&shutdown_mutex); if (*socket == -1) { lf_print_log("Socket is already closed."); + LF_MUTEX_UNLOCK(&shutdown_mutex); return 0; } if (!read_before_closing) { @@ -382,6 +383,8 @@ int shutdown_socket(int* socket, bool read_before_closing) { while (read(*socket, buffer, 10) > 0) ; } + LF_MUTEX_UNLOCK(&shutdown_mutex); + return 0; close_socket: // Label to jump to the closing part of the function // NOTE: In all common TCP/IP stacks, there is a time period, @@ -391,8 +394,10 @@ int shutdown_socket(int* socket, bool read_before_closing) { // duplicated packets intended for this program. if (close(*socket)) { lf_print_log("Error while closing socket: %s\n", strerror(errno)); + LF_MUTEX_UNLOCK(&shutdown_mutex); return -1; } *socket = -1; + LF_MUTEX_UNLOCK(&shutdown_mutex); return 0; } diff --git a/util/sensor_simulator.c b/util/sensor_simulator.c index 2ad8b8ef9..4d620d8bb 100644 --- a/util/sensor_simulator.c +++ b/util/sensor_simulator.c @@ -347,13 +347,16 @@ void end_sensor_simulator() { lf_register_print_function(NULL, -1); _lf_sensor_post_message(_lf_sensor_close_windows, NULL); - void* thread_return; - lf_thread_join(_lf_sensor.output_thread_id, &thread_return); + // Join thread, if it was created and it was not already joined. + if (_lf_sensor.thread_created > 0) { + void* thread_return; + lf_thread_join(_lf_sensor.output_thread_id, &thread_return); + _lf_sensor.thread_created = 0; + } // Timeout mode should result in the input thread exiting on its own. // pthread_kill(_lf_sensor.input_thread_id, SIGINT); - _lf_sensor.thread_created = 0; if (_lf_sensor.log_file != NULL) { fclose(_lf_sensor.log_file); }