Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdown_socket() function for follow up of #505 #506

Open
wants to merge 38 commits into
base: main
Choose a base branch
from

Conversation

Jakio815
Copy link
Collaborator

@Jakio815 Jakio815 commented Dec 20, 2024

This PR follow ups #505.
Please merge with lf-lang/lingua-franca#2474.

New Integrated Function: shutdown_socket()

int shutdown_socket(int* socket, bool read_before_closing);

Arguments

  • socket: A pointer to the socket descriptor that needs to be shut down and closed.
  • read_before_closing: A boolean indicating whether the socket should read any remaining incoming data before closing:
    • true: Initiates a graceful shutdown by sending a FIN packet (SHUT_WR) and waits for EOF (0-length message) from the peer.
    • false: Immediately shuts down both reading and writing directions (SHUT_RDWR).

Return Values

  • 0: Indicates successful shutdown and closure of the socket.
  • -1: Indicates a failure during shutdown or closure, with errno set to describe the specific error.

Function Description
This function gracefully shuts down and closes a socket.

  • When read_before_closing is false:
    • The function calls shutdown(SHUT_RDWR) to immediately stop both sending and receiving data. Then calls close().
  • When read_before_closing is true:
    • The function calls shutdown(SHUT_WR) to signal the end of writing but allows reading to continue.
    • It waits for the peer to send an EOF (indicated by read() returning 0), and discards all received bytes.

This function holds a new mutex named shutdown_mutex, which ensures preventing deadlocks.

Refactoring on close_inbound_socket() and close_outbound_socket() from federate.c

close_inbound_socket()

The original code looked not updated. There were no use case when the argument flag was not 1. I removed the flag argument, and all unused code.

close_outbound_socket()

The original flags were only set by this one line code.
int flag = _lf_normal_termination ? 1 : -1;
So it depends on _lf_normal_termination. However, this function can directly use _lf_normal_termination, so I removed the flag, and directly used _lf_normal_termination.

Minor implementation details for macOS.

When the federate sends the MSG_TYPE_RESIGN signal, the RTI calls shutdown(SHUT_WR). Then, in the federate side, the thread from listen_to_rti_TCP()'s read_from_socket()'s read() returns with a EOF. Then, it calls shutdown_socket().

However, the shutdown() fails, with an errno Socket is not connected on only MacOS. even though the connection is half open. However, since the connection is half p[em, the federate should still send a FIN_ACK packet to the RTI, because the RTI is blocked on the read().

Thus, the federate does not return on failure of shutdown(), and calls the close(), to ensure the FIN_ACK packet is sent.

To summarize, even when shutdown() fails, the code will try close(), to ensure the socket is closed.

Minor addition of logic in federate.c's handle_tagged_message()

In federate.c's handle_tagged_message(), when the received message already missed the tag, it closes the inbound socket, however does not return -1 for failure of reading the message. This is logic is added.

Some mutex locks removed.

socket_mutex was only used as federate's inbound socket mutex. Especially, it was only used when closing inbound sockets. The new shutdown_socket() function holds its own mutex, so removed the duplication of mutex locks.

@Jakio815 Jakio815 marked this pull request as ready for review December 21, 2024 01:18
@Jakio815 Jakio815 changed the title Draft: Add shutdown_socket() function. Add shutdown_socket() function. Dec 21, 2024
@Jakio815 Jakio815 changed the title Add shutdown_socket() function. Add shutdown_socket() function for follow up of #505 Dec 21, 2024
Copy link
Member

@hokeun hokeun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice refactoring. Just left minor suggestions.

@@ -1030,9 +1009,7 @@ void send_reject(int* socket_id, unsigned char error_code) {
lf_print_warning("RTI failed to write MSG_TYPE_REJECT message on the socket.");
}
// Close the socket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: how about adding more information for this call? For example, "Close the socket without reading until EOF."

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I fixed it as suggested.

@@ -1418,9 +1395,7 @@ void lf_connect_to_federates(int socket_descriptor) {
if (!authenticate_federate(&socket_id)) {
lf_print_warning("RTI failed to authenticate the incoming federate.");
// Close the socket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I fixed it as suggested.

@@ -1488,8 +1463,7 @@ void* respond_to_erroneous_connections(void* nothing) {
lf_print_warning("RTI failed to write FEDERATION_ID_DOES_NOT_MATCH to erroneous incoming connection.");
}
// Close the socket.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I fixed it as suggested.

@Jakio815
Copy link
Collaborator Author

Jakio815 commented Dec 22, 2024

The macOS test is not passing for some unknown reason.

The CI test log seems to not work starting from Running: src/federated/DistributedCountPhysical.lf (28%), however if you run the federated tests on the local machine, it actually doesn't work starting from Dataflow.lf.

The reason for not passing is that the program blocks on the final shutdown phase.

First, this is how the LF code is intended to work.

  1. The federate sends a RESIGN signal.
  2. The RTI calls shutdown(socket, SHUT_WR), and then immediately calls read()
    • The shutdown() sends a FIN packet to the federate.
  3. The federate was blocked on read() to listen messages from the RTI.
    • The federate receives the FIN packet, and finishes reading the data in the socket buffer, and then returns EOF.
  4. The federate will also send a FIN packet and the RTI's read() will return 0.

This normally works on Linux, thus passing all tests. However, this fails on Mac.
Where it fails is exactly in step 3. The federate received a FIN packet, and the read() should return EOF, however it does not, and gets blocked forever.

I also checked that the FIN packet was properly sent, and the ACK from the federate to the RTI was also sent.

// netstat -ant | grep 15045
tcp4       0      0  127.0.0.1.15045        127.0.0.1.64842        FIN_WAIT_2
tcp4       0      0  127.0.0.1.64842        127.0.0.1.15045        CLOSE_WAIT

The RTI(15045) is in FIN_WAIT_2 state, which means that it sent the FIN, and received the ACK.
The federate(64842) is in CLOSE_WAIT which means that the remote side's(RTI) connection is closed, and waiting to connect its own connection.

This error can be reproduced by the lingua-franca's repo on branch refactor-only-comm-type and reactor-c branch on shutdown, and it should be done on a MacOS machine.

@hokeun @edwardalee Could anyone help investigating this problem?

@Jakio815 Jakio815 changed the base branch from refactor-only-comm-type to main January 8, 2025 04:45
@Jakio815 Jakio815 requested a review from hokeun January 24, 2025 18:43
Copy link
Member

@hokeun hokeun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice and clean!

@Jakio815
Copy link
Collaborator Author

@edwardalee Would you please review this?

Copy link
Contributor

@edwardalee edwardalee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've requested what I think is just a documentation change, but also a sanity check to make sure the right mutex is held by all callers of this new function.

Comment on lines +244 to +250
* Shutdown and close the socket. If read_before_closing is false, it just immediately calls shutdown() with SHUT_RDWR
* and close(). If read_before_closing is true, it calls shutdown with SHUT_WR, only disallowing further writing. Then,
* it calls read() until EOF is received, and discards all received bytes.
* @param socket Pointer to the socket descriptor to shutdown and close.
* @param read_before_closing If true, read until EOF before closing the socket.
* @return int 0 for success and -1 for an error.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this function assumes the lf_outbound_socket_mutex mutex lock is held when the function is called. This needs to be stated in the documentation and checked for all calls to the function.

Copy link
Collaborator Author

@Jakio815 Jakio815 Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't really assume the mutex lock is held always.

In normal socket shutdown cases (normal termination), they do assume there is a mutex lock, for example, federate.c's close_inbound_socket(), close_outbound_socket(), and rti_remote.c's handle_federate_failed(), handle_federate_resign() does hold the mutexes.

However, some corner cases that handle abnormal termination, do not assume that there are mutex locks.
For example, in federate.c's listen_to_rti_TCP() function, when the 1byte header read fails, it does not lock any mutexes, and just shutdowns the socket.
Also, in rti_remote.c's respond_to_erroneous_connections() function, which is running in a separate thread, it does not lock any mutexes and just shutdowns the socket.

So, what I thought was that the user should handle the mutex if it is needed.

Of course, I could add one line in the documentation of this function, such as
Mutex locks should be considered before calling this function.
or would there be any other suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the code correct without this specific mutex? Note that it can't be any mutex... All callers have to use the same mutex...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, professor, I think I didn't understand your question in the first place.

What I understood was, "Don't you need this lf_outbound_socket_mutex inside this function?" Am I correct? For that question, as I explained, some cases need mutexes, and some cases do not. So, I thought that the function caller should set the mutex on their own.

If I misunderstood your question, could you please clarify your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Among other things, the function does this:

int shutdown_socket(int* socket, bool read_before_closing) {
  if (*socket == -1) {
    lf_print_log("Socket is already closed.");
    return 0;
  }
  ...
  *socket = -1;
  return 0;
}

If this function can be called from multiple threads, then two threads can both get past the first check and then proceed to try to simultaneously close the socket. The only way this code is safe is if you can assure that no more than one thread can call this function.

If more than one thread can call the function, then the above code is only correct if all threads that can call the function acquire the same mutex before calling it. Otherwise, the code is not correct.

One way to ensure safety is to assure that anywhere this is called without acquiring a mutex, only one thread is running. Alternatively, you would need to assure that no other thread that might be running can call this function. This may be quite difficult to assure.

One solution would be to acquire a mutex within this function. But this may be inefficient because some callers may already hold the relevant mutex. In the absence of such a local mutex, you would need to assure that all callers hold the mutex, and it would have to be clearly documented that this is required to call this function.

Copy link
Collaborator Author

@Jakio815 Jakio815 Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwardalee

I just added another mutex in the shutdown_socket() function, named as socket_mutex, and updated the PR description.

So, now when the function shutdown_socket() is called, the shutdown_mutex is first locked, and unlocked in the end of the function, preventing other shutdown_socket() function calls creating a potential memory access issues in the critical section.

To remove some inefficiency, I removed all unnecessary mutex locks that were acquired only for shutdown.

For example, the mutex lock in the function below is removed.

static void close_inbound_socket(int fed_id) {
  // LF_MUTEX_LOCK(&socket_mutex); // Now removed.
  if (_fed.sockets_for_inbound_p2p_connections[fed_id] >= 0) {
    shutdown_socket(&_fed.sockets_for_inbound_p2p_connections[fed_id], false);
  }
  // LF_MUTEX_UNLOCK(&socket_mutex); // Now removed.
}

Another change is that I removed the socket_mutex itself, which acted as the inbound_socket_mutex for federates. It was only used before calling the shutdown_socket() function, so I removed it because there are mutex locks inside the shutdown_socket function now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants