Skip to content

Commit

Permalink
Allow federate to handle that an upstream has connected or disconnect…
Browse files Browse the repository at this point in the history
…ed even before receiving the start time
  • Loading branch information
ChadliaJerad committed Dec 11, 2024
1 parent 02730df commit bb67e63
Showing 1 changed file with 45 additions and 49 deletions.
94 changes: 45 additions & 49 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,44 @@ static void rti_address(const char* hostname, uint16_t port, struct addrinfo** r
}
}

/**
* @brief Handle message from the RTI that an upstream federate has connected.
*
*/
static void handle_upstream_connected_message(void) {
size_t bytes_to_read = sizeof(uint16_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read upstream connected message from RTI.");
uint16_t connected = extract_uint16(buffer);
LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected);
// Mark the upstream as connected.
for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_ids[i] == connected) {
_lf_zero_delay_cycle_upstream_disconnected[i] = false;
}
}
}

/**
* @brief Handle message from the RTI that an upstream federate has disconnected.
*
*/
static void handle_upstream_disconnected_message(void) {
size_t bytes_to_read = sizeof(uint16_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read upstream disconnected message from RTI.");
uint16_t disconnected = extract_uint16(buffer);
LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected);
// Mark the upstream as disconnected.
for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) {
_lf_zero_delay_cycle_upstream_disconnected[i] = true;
}
}
}

/**
* Send the specified timestamp to the RTI and wait for a response.
* The specified timestamp should be current physical time of the
Expand All @@ -1030,16 +1068,12 @@ static instant_t get_start_time_from_rti(instant_t my_physical_time) {
if (buffer[0] == MSG_TYPE_FAILED) {
lf_print_error_and_exit("RTI has failed.");
} else if (buffer[0] == MSG_TYPE_UPSTREAM_CONNECTED) {
// We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive
// FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set?
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_CONNECTED_LENGTH - 1, buffer + 1, NULL,
"Failed to complete reading MSG_TYPE_UPSTREAM_CONNECTED.");
// We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP_START to arrive
handle_upstream_connected_message();
continue;
} else if (buffer[0] == MSG_TYPE_UPSTREAM_DISCONNECTED) {
// We need to swallow this message so that we continue waiting for MSG_TYPE_TIMESTAMP_START to arrive
// FIXME: Shouldn't we keep the ids, so that these messages are handled right after the startime is set?
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, MSG_TYPE_UPSTREAM_DISCONNECTED_LENGTH - 1, buffer + 1,
NULL, "Failed to complete reading MSG_TYPE_UPSTREAM_DISCONNECTED.");
// We need to handle this message and continue waiting for MSG_TYPE_TIMESTAMP_START to arrive
handle_upstream_disconnected_message();
continue;
} else {
lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from the RTI. Got %u (see net_common.h).",
Expand Down Expand Up @@ -1596,44 +1630,6 @@ static void send_failed_signal() {
*/
static void handle_rti_failed_message(void) { exit(1); }

/**
* @brief Handle message from the RTI that an upstream federate has connected.
*
*/
static void handle_upstream_connected_message(void) {
size_t bytes_to_read = sizeof(uint16_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read upstream connected message from RTI.");
uint16_t connected = extract_uint16(buffer);
LF_PRINT_DEBUG("Received notification that upstream federate %d has connected", connected);
// Mark the upstream as connected.
for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_ids[i] == connected) {
_lf_zero_delay_cycle_upstream_disconnected[i] = false;
}
}
}

/**
* @brief Handle message from the RTI that an upstream federate has disconnected.
*
*/
static void handle_upstream_disconnected_message(void) {
size_t bytes_to_read = sizeof(uint16_t);
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&_fed.socket_TCP_RTI, bytes_to_read, buffer, NULL,
"Failed to read upstream disconnected message from RTI.");
uint16_t disconnected = extract_uint16(buffer);
LF_PRINT_DEBUG("Received notification that upstream federate %d has disconnected", disconnected);
// Mark the upstream as disconnected.
for (size_t i = 0; i < _lf_zero_delay_cycle_action_table_size; i++) {
if (_lf_zero_delay_cycle_upstream_ids[i] == disconnected) {
_lf_zero_delay_cycle_upstream_disconnected[i] = true;
}
}
}

/**
* Thread that listens for TCP inputs from the RTI.
* When messages arrive, this calls the appropriate handler.
Expand Down Expand Up @@ -2070,9 +2066,9 @@ void lf_connect_to_rti(const char* hostname, int port) {
if (result < 0)
continue; // Connect failed.

// Have connected to an RTI, but not sure it's the right RTI.
// Send a MSG_TYPE_FED_IDS message and wait for a reply.
// Notify the RTI of the ID of this federate and its federation.
// Have connected to an RTI, but not sure it's the right RTI.
// Send a MSG_TYPE_FED_IDS message and wait for a reply.
// Notify the RTI of the ID of this federate and its federation.

#ifdef FEDERATED_AUTHENTICATED
LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID.");
Expand Down

0 comments on commit bb67e63

Please sign in to comment.