From cfda01a8a76fb7e30237bf45c82cc2a5be69d4fe Mon Sep 17 00:00:00 2001 From: "Edward A. Lee" Date: Wed, 28 Feb 2024 17:51:31 -0800 Subject: [PATCH] Deal with threading issues with libwebsockets --- examples/C/src/browser-ui/Uptime.lf | 1 + examples/C/src/browser-ui/WebSocket.lf | 1 + examples/C/src/lib/WebSocketServer.lf | 174 ++++++++++++++++++++----- 3 files changed, 144 insertions(+), 32 deletions(-) diff --git a/examples/C/src/browser-ui/Uptime.lf b/examples/C/src/browser-ui/Uptime.lf index 559a7a84..b735e346 100644 --- a/examples/C/src/browser-ui/Uptime.lf +++ b/examples/C/src/browser-ui/Uptime.lf @@ -46,6 +46,7 @@ main reactor { to_send->wsi = NULL; to_send->length = length; to_send->message = message; + to_send->binary = false; lf_set(w.send, to_send); =} } diff --git a/examples/C/src/browser-ui/WebSocket.lf b/examples/C/src/browser-ui/WebSocket.lf index 7a7bd071..2c0fed93 100644 --- a/examples/C/src/browser-ui/WebSocket.lf +++ b/examples/C/src/browser-ui/WebSocket.lf @@ -47,6 +47,7 @@ main reactor { container->message = message; container->length = strlen(message) + 1; container->wsi = send_action->value->wsi; + container->binary = false; // Sending text. lf_set(s.send, container); // Schedule the next send. diff --git a/examples/C/src/lib/WebSocketServer.lf b/examples/C/src/lib/WebSocketServer.lf index 9d5ed3b2..7c2bb132 100644 --- a/examples/C/src/lib/WebSocketServer.lf +++ b/examples/C/src/lib/WebSocketServer.lf @@ -38,7 +38,8 @@ * The data conveyed can be any byte array. In case a received value is a string that is not null * terminated, this reactor appends a null character after the message payload. It does not include * this null character in the length field of the output struct, but rather just reports the length - * as reported by the incoming message. + * as reported by the incoming message. If the `binary` parameter is set to true, then JavaScript + * at the receiving end will get a Blob. Otherwise, it gets text. * * A key limitation is that this should use the secure sockets API in libwebsockets to get SSL. * @@ -61,13 +62,28 @@ preamble {= #include + /** + * A web socket string message together with its web socket instance. + * This needs a destructor and copy constructor because the message + * is assumed to be in allocated memory. + */ + typedef struct web_socket_message_t { + struct lws* wsi; // Web socket instance. + size_t length; + void* message; + bool binary; + struct web_socket_message_t* next; // Pointer to the next message in the list or NULL for end. + } web_socket_message_t; + typedef struct server_status_t { void* connected_action; // Action to notify of changes in connected status. - void* received_action; // Action to notify of messages received. + void* received_action; // Action to notify of messages received. struct lws_context* context; // The context. - int max_clients; // Maximum number of clients. - int* client_count; // Pointer to the client_count state variable. - bool running; // Indicator that the listening thread is running. + int max_clients; // Maximum number of clients. + int* client_count; // Pointer to the client_count state variable. + bool running; // Indicator that the listening thread is running. + web_socket_message_t* pending_messages; // Head of a list of pending messages to send. + lf_mutex_t* mutex; // Mutex for modifying this struct. } server_status_t; /** @@ -80,17 +96,6 @@ preamble {= bool connected; } web_socket_instance_t; - /** - * A web socket string message together with its web socket instance. - * This needs a destructor and copy constructor because the message - * is assumed to be in allocated memory. - */ - typedef struct web_socket_message_t { - struct lws* wsi; // Web socket instance. - size_t length; - void* message; - } web_socket_message_t; - /** Destructor for an instance of web_socket_message_t. */ void web_socket_message_destructor(void* message); @@ -100,7 +105,11 @@ preamble {= #endif // WEBSOCKET_H =} -reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { +reactor WebSocketServer( + hostport: int = 8000, + max_clients: int = 0, + binary: bool = false +) { output connected: web_socket_instance_t output received: web_socket_message_t* @@ -115,11 +124,15 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { preamble {= // Thread handling incoming messages. + // All lws calls except lws_cancel_service must occur in this thread. void* websocket_thread(void* args) { server_status_t* status = (server_status_t*)args; + while(status->running) { + // Invoke any pending callbacks. // According to the docs, the timeout argument is ignored. - lws_service(status->context, 50); + // Nevertheless, set to 500ms. + lws_service(status->context, 500); } lws_context_destroy(status->context); return NULL; @@ -143,6 +156,9 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { break; case LWS_CALLBACK_HTTP_CONFIRM_UPGRADE: + // NOTE: We do not need to lock status mutex to check and update client_count + // because it is only checked and updated in this websocket_thread. + // Check against maximum number of connections. if (status->max_clients > 0 && *status->client_count >= status->max_clients) { // Deny the connection. @@ -150,6 +166,7 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { // Increment the client count past the maximum because it will be // decremented when this closes and the browser will retry. *status->client_count = *status->client_count + 1; + return 1; } @@ -178,7 +195,7 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { case LWS_CALLBACK_CLOSED: case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: lf_print_error("*** connection error."); - + *status->client_count = *status->client_count - 1; ws_instance.wsi = wsi; @@ -197,6 +214,21 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { LF_PRINT_LOG("WS callback invoked with reason: %d", reason); server_status_t* status = (server_status_t*)lws_context_user(lws_get_context(wsi)); switch(reason) { + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + // lws_service() blocking call interrupted. Check for messages to send. + if (lf_mutex_lock(status->mutex)) { + lf_print_error("Failed to lock mutex in WebSocketServer."); + return 0; + } + + if (status->pending_messages) { + lws_callback_on_writable(status->pending_messages->wsi); + } + + if (lf_mutex_unlock(status->mutex)) { + lf_print_error("Failed to unlock mutex in WebSocketServer."); + } + break; case LWS_CALLBACK_RECEIVE: if (len > 0) { LF_PRINT_LOG("**** Server received WS message."); @@ -210,11 +242,56 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { ((char*)payload)[len] = '\0'; received->length = len; received->message = payload; + received->next = NULL; + received->binary = true; // Treat all received data as binary. // Carry the null terminator in the action payload, just in case. lf_schedule_value(status->received_action, 0, received, len + 1); } break; + case LWS_CALLBACK_SERVER_WRITEABLE: + // Websocket has become writable. See whether there are pending + // messages to send. This requires locking the status mutex. + if (lf_mutex_lock(status->mutex)) { + lf_print_error("Failed to lock mutex in WebSocketServer."); + return 0; + } + + web_socket_message_t* to_send = status->pending_messages; + if (to_send) { + // There is a message to send. Remove it from the list. + status->pending_messages = to_send->next; + + // Send it. + int length = to_send->length; + // The buffer needs LWS_PRE bytes _before_ the message. + // Do not include the null terminator, because this makes JSON unable to parse it. + unsigned char buffer[LWS_PRE + length]; + memcpy(&buffer[LWS_PRE], to_send->message, length); + int result; + if (to_send->binary) { + result = lws_write(to_send->wsi, &buffer[LWS_PRE], length, LWS_WRITE_BINARY); + } else { + result = lws_write(to_send->wsi, &buffer[LWS_PRE], length, LWS_WRITE_TEXT); + } + if (result < length) { + lf_print_warning("Send on web socket failed. Message send is incomplete."); + } + // Free the memory for the pending send. + web_socket_message_destructor(to_send); + + // If there is another message, request another callback. + if (status->pending_messages) { + lws_callback_on_writable(status->pending_messages->wsi); + } + } + + if (lf_mutex_unlock(status->mutex)) { + lf_print_error("Failed to unlock mutex in WebSocketServer."); + } + + break; + // Do we need to handle LWS_CALLBACK_CLOSED? // Seems to be handled in the HTTP callback. @@ -230,6 +307,7 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { // Do not free the wsi. } + // Argument and return type is web_socket_message_t*. void* web_socket_message_copy_constructor(void* message) { web_socket_message_t* cast = (web_socket_message_t*)message; web_socket_message_t* result = (web_socket_message_t*)malloc(sizeof(web_socket_message_t)); @@ -237,6 +315,9 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { void* copy = malloc(length * sizeof(char)); result->message = memcpy(copy, cast->message, length); result->wsi = cast->wsi; + result->length = length; + result->binary = cast->binary; + result->next = NULL; return result; } =} @@ -247,6 +328,7 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { lf_set_destructor(received_action, web_socket_message_destructor); lf_set_copy_constructor(received_action, web_socket_message_copy_constructor); + // Assume the input is dynamically allocated, including its message field. lf_set_destructor(send, web_socket_message_destructor); lf_set_copy_constructor(send, web_socket_message_copy_constructor); @@ -271,7 +353,13 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { }; // To get callbacks to be passed a pointer to the status struct: info.user = &self->status; - + + // Callbacks will need to acquire a mutex to modify the status struct pending_messages field. + self->status.mutex = (lf_mutex_t*)calloc(1, sizeof(lf_mutex_t)); + if (lf_mutex_init(self->status.mutex)) { + lf_print_error_and_exit("Failed to initialize mutex in WebSocketServer."); + } + self->status.context = lws_create_context(&info); if (!self->status.context) { lf_print_error_and_exit("Failed to create server for web sockets."); @@ -282,10 +370,10 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { self->status.max_clients = self->max_clients; self->status.client_count = &self->client_count; + self->status.running = true; lf_thread_t listener; lf_thread_create(&listener, &websocket_thread, &self->status); - self->status.running = true; =} reaction(received_action) -> received {= @@ -302,8 +390,8 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { =} reaction(send) {= - // NOTE: This send must be before the reaction to connected_action - // because the latter could cause a disconnection. + // NOTE: This send cannot be before the reaction to connected_action + // because we will get a causality loop. if(send->value->message == NULL) { lf_print_error("Cannot send NULL message."); } else if (send->value->wsi == NULL) { @@ -317,6 +405,8 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { to_send.wsi = instance; to_send.length = send->value->length; to_send.message = send->value->message; + to_send.binary = send->value->binary; + to_send.next = NULL; send_message(&to_send); } } else if (hashset_is_member(self->connected_instances, send->value->wsi)) { @@ -327,16 +417,36 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) { =} method send_message(to_send: web_socket_message_t*): int {= - int length = to_send->length; - // The buffer needs LWS_PRE bytes _before_ the message. - // Do not include the null terminator, because this make JSON unable to parse it. - char buffer[LWS_PRE + length]; - strncpy(buffer + LWS_PRE, to_send->message, length); - int result = lws_write(to_send->wsi, (unsigned char*)(buffer + LWS_PRE), length, LWS_WRITE_TEXT); - if (result < length) { - lf_print_warning("Send on web socket failed. Message send is incomplete."); + // Cannot actually send the message here for two reasons: + // 1. websocketlib is not thread safe, so the write needs to occur in the one thread making calls. + // 2. It is not safe to write until the socket is ready for a write. + // Hence, we append the message to list of pending messages and wait for a callback that + // the socket is ready for a write. + + // Copy the message and append to the context list. + web_socket_message_t* copy = (web_socket_message_t*)web_socket_message_copy_constructor(to_send); + + // Acquire the status mutex before putting on the pending messages list. + if (lf_mutex_lock(self->status.mutex)) { + lf_print_error("Failed to lock WebSocketServer mutex."); + return -1; + } + + // Scroll to the end of the list of pending messages. + web_socket_message_t** existing = &(self->status.pending_messages); + while (*existing != NULL) { + existing = &((*existing)->next); + } + *existing = copy; + + // Interrupt lws_service(). + lws_cancel_service(self->status.context); + + if (lf_mutex_unlock(self->status.mutex)) { + lf_print_error("Failed to unlock WebSocketServer mutex."); + return -1; } - return result; + return 0; =} reaction(shutdown) {=