Skip to content

Commit

Permalink
Fix port type && change to print_log on shutdown failures && Refactor…
Browse files Browse the repository at this point in the history
… close_inbound_socket()
  • Loading branch information
Jakio815 committed Dec 21, 2024
1 parent 1b00ed3 commit de2fbde
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 21 deletions.
25 changes: 8 additions & 17 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -403,26 +403,15 @@ static trigger_handle_t schedule_message_received_from_network_locked(environmen

/**
* Close the socket that receives incoming messages from the
* specified federate ID. This function should be called when a read
* of incoming socket fails or when an EOF is received.
* It can also be called when the receiving end wants to stop communication,
* in which case, flag should be 1.
* specified federate ID.
*
* @param fed_id The ID of the peer federate sending messages to this
* federate.
* @param flag 0 if an EOF was received, -1 if a socket error occurred, 1 otherwise.
*/
static void close_inbound_socket(int fed_id, int flag) {
static void close_inbound_socket(int fed_id) {
LF_MUTEX_LOCK(&socket_mutex);
if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) {
if (flag >= 0) {
if (flag > 0) {
shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false);
} else {
// Have received EOF from the other end. Send EOF to the other end.
shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], true);
}
}
shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false);
}
LF_MUTEX_UNLOCK(&socket_mutex);
}
Expand Down Expand Up @@ -663,7 +652,7 @@ static int handle_tagged_message(int* socket, int fed_id) {
env->current_tag.time - start_time, env->current_tag.microstep, intended_tag.time - start_time,
intended_tag.microstep);
// Close socket, reading any incoming data and discarding it.
close_inbound_socket(fed_id, 1);
close_inbound_socket(fed_id);
} else {
// Need to use intended_tag here, not actual_tag, so that STP violations are detected.
// It will become actual_tag (that is when the reactions will be invoked).
Expand Down Expand Up @@ -1640,7 +1629,7 @@ void lf_terminate_execution(environment_t* env) {
LF_PRINT_DEBUG("Closing incoming P2P sockets.");
// Close any incoming P2P sockets that are still open.
for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {
close_inbound_socket(i, 1);
close_inbound_socket(i);
// Ignore errors. Mark the socket closed.
_fed.sockets_for_inbound_p2p_connections[i] = -1;
}
Expand Down Expand Up @@ -1930,9 +1919,11 @@ void lf_connect_to_rti(const char* hostname, int port) {

void lf_create_server(int specified_port) {
assert(specified_port <= UINT16_MAX && specified_port >= 0);
if (create_TCP_server(specified_port, &_fed.server_socket, (uint16_t*)&_fed.server_port, false)) {
uint16_t port;
if (create_TCP_server(specified_port, &_fed.server_socket, &port, false)) {
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
_fed.server_port = (int)port;
LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port);

// Send the server port number to the RTI
Expand Down
8 changes: 4 additions & 4 deletions core/federated/network/socket_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ static int create_server(uint16_t port, int* final_socket, uint16_t* final_port,
return -1;
}
set_socket_timeout_option(socket_descriptor, &timeout_time);
int used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry);
uint16_t used_port = set_socket_bind_option(socket_descriptor, port, increment_port_on_retry);
if (sock_type == 0) {
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
Expand Down Expand Up @@ -408,15 +408,15 @@ void write_to_socket_fail_on_error(int* socket, size_t num_bytes, unsigned char*
int shutdown_socket(int* socket, bool read_before_closing) {
if (!read_before_closing) {
if (shutdown(*socket, SHUT_RDWR)) {
lf_print_warning("On shut down TCP socket, received reply: %s", strerror(errno));
lf_print_log("On shutdown socket, received reply: %s", strerror(errno));
return -1;
}
} else {
// Signal the other side that no further writes are expected by sending a FIN packet.
// This indicates the write direction is closed. For more details, refer to:
// https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket
if (shutdown(*socket, SHUT_WR)) {
lf_print_warning("Failed to shut down socket: %s", strerror(errno));
lf_print_log("Failed to shutdown socket: %s", strerror(errno));
return -1;
}

Expand All @@ -435,7 +435,7 @@ int shutdown_socket(int* socket, bool read_before_closing) {
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
if (close(*socket)) {
lf_print_warning("Error while closing socket: %s\n", strerror(errno));
lf_print_log("Error while closing socket: %s\n", strerror(errno));
return -1;
}
*socket = -1;
Expand Down

0 comments on commit de2fbde

Please sign in to comment.