Skip to content

Commit

Permalink
Fix accept_socket. Now federate.c's lf_handle_p2p_connections_from_fe…
Browse files Browse the repository at this point in the history
…derates(), and rti_remote.c's lf_connect_to_federates() and respond_to_erroneous_connections() uses a single function to accept_socket.
  • Loading branch information
Jakio815 committed Dec 17, 2024
1 parent 8ead884 commit ef44cdd
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
6 changes: 2 additions & 4 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,7 @@ static bool authenticate_federate(int* socket) {

void lf_connect_to_federates(int socket_descriptor) {
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
int socket_id = accept_socket(rti_remote->socket_descriptor_TCP);
int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1);
// Wait for the first message from the federate when RTI -a option is on.
#ifdef __RTI_AUTH__
if (rti_remote->authentication_enabled) {
Expand Down Expand Up @@ -1476,11 +1476,9 @@ void* respond_to_erroneous_connections(void* nothing) {
initialize_lf_thread_id();
while (true) {
// Wait for an incoming connection request.
struct sockaddr client_fd;
uint32_t client_length = sizeof(client_fd);
// The following will block until either a federate attempts to connect
// or close(rti->socket_descriptor_TCP) is called.
int socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length);
int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1);
if (socket_id < 0)
return NULL;

Expand Down
17 changes: 3 additions & 14 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1994,21 +1994,10 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
_fed.inbound_socket_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t));
while (received_federates < _fed.number_of_inbound_p2p_connections && !_lf_termination_executed) {
// Wait for an incoming connection request.
struct sockaddr client_fd;
uint32_t client_length = sizeof(client_fd);
int socket_id = accept(_fed.server_socket, &client_fd, &client_length);

int socket_id = accept_socket(_fed.server_socket, _fed.socket_TCP_RTI);
if (socket_id < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
if (rti_failed())
break;
else
continue; // Try again.
} else if (errno == EPERM) {
lf_print_error_system_failure("Firewall permissions prohibit connection.");
} else {
lf_print_error_system_failure("A fatal error occurred while accepting a new socket.");
}
lf_print_warning("Federate failed to accept the socket.");
return NULL;
}
LF_PRINT_LOG("Accepted new connection from remote federate.");

Expand Down
34 changes: 29 additions & 5 deletions core/federated/network/socket_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,46 @@ void create_UDP_server(uint16_t port, int* final_socket, uint16_t* final_port) {
*final_port = used_port;
}

int accept_socket(int socket) {
/**
* Return true if either the socket to the RTI is broken or the socket is
* alive and the first unread byte on the socket's queue is MSG_TYPE_FAILED.
*/
bool check_socket_closed(int socket) {
unsigned char first_byte;
ssize_t bytes = peek_from_socket(socket, &first_byte);
if (bytes < 0 || (bytes == 1 && first_byte == MSG_TYPE_FAILED)) {
return true;
} else {
return false;
}
}

int accept_socket(int socket, int rti_socket) {
struct sockaddr client_fd;
// Wait for an incoming connection request.
uint32_t client_length = sizeof(client_fd);
// The following blocks until a federate connects.
int socket_id = -1;
while (1) {
while (true) {
// When close(socket) is called, the accept() will return -1.
socket_id = accept(socket, &client_fd, &client_length);
if (socket_id >= 0) {
// Got a socket
break;
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK)) {
lf_print_error_system_failure("RTI failed to accept the socket.");
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK || errno != EINTR)) {
lf_print_warning("Failed to accept the socket. %s.", strerror(errno));
break;
} else if (errno == EPERM) {
lf_print_error_system_failure("Firewall permissions prohibit connection.");
} else {
// For the federates, it should check if the rti_socket is still open, before retrying accept().
if (rti_socket == -1) {
if (check_socket_closed(rti_socket)) {
break;
}
}
// Try again
lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno));
lf_print_warning("Failed to accept the socket. %s. Trying again.", strerror(errno));
continue;
}
}
Expand Down
9 changes: 7 additions & 2 deletions include/core/federated/network/socket_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
*/
#define DEFAULT_PORT 15045u

/**
* Byte identifying that the federate or the RTI has failed.
*/
#define MSG_TYPE_FAILED 25

typedef enum socket_type_t { TCP, UDP } socket_type_t;

/**
Expand Down Expand Up @@ -100,11 +105,11 @@ void create_UDP_server(uint16_t port, int* final_socket, uint16_t* final_port);
* errors cause the function to retry accepting the connection.
*
* @param socket The server socket file descriptor that is listening for incoming connections.
* @param client_fd A pointer to a `struct sockaddr` that will hold the client's address information.
* @param rti_socket The rti socket for the federate to check if it is still open.
* @return int The file descriptor for the newly accepted socket on success, or -1 on failure
* (with an appropriate error message printed).
*/
int accept_socket(int socket, struct sockaddr* client_fd);
int accept_socket(int socket, int rti_socket);

/**
*
Expand Down

0 comments on commit ef44cdd

Please sign in to comment.