Skip to content

Commit

Permalink
Deal with threading issues with libwebsockets
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardalee committed Feb 29, 2024
1 parent 2df0674 commit cfda01a
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 32 deletions.
1 change: 1 addition & 0 deletions examples/C/src/browser-ui/Uptime.lf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ main reactor {
to_send->wsi = NULL;
to_send->length = length;
to_send->message = message;
to_send->binary = false;
lf_set(w.send, to_send);
=}
}
1 change: 1 addition & 0 deletions examples/C/src/browser-ui/WebSocket.lf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ main reactor {
container->message = message;
container->length = strlen(message) + 1;
container->wsi = send_action->value->wsi;
container->binary = false; // Sending text.

lf_set(s.send, container);
// Schedule the next send.
Expand Down
174 changes: 142 additions & 32 deletions examples/C/src/lib/WebSocketServer.lf
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
* The data conveyed can be any byte array. In case a received value is a string that is not null
* terminated, this reactor appends a null character after the message payload. It does not include
* this null character in the length field of the output struct, but rather just reports the length
* as reported by the incoming message.
* as reported by the incoming message. If the `binary` parameter is set to true, then JavaScript
* at the receiving end will get a Blob. Otherwise, it gets text.
*
* A key limitation is that this should use the secure sockets API in libwebsockets to get SSL.
*
Expand All @@ -61,13 +62,28 @@ preamble {=

#include <libwebsockets.h>

/**
* A web socket string message together with its web socket instance.
* This needs a destructor and copy constructor because the message
* is assumed to be in allocated memory.
*/
typedef struct web_socket_message_t {
struct lws* wsi; // Web socket instance.
size_t length;
void* message;
bool binary;
struct web_socket_message_t* next; // Pointer to the next message in the list or NULL for end.
} web_socket_message_t;

typedef struct server_status_t {
void* connected_action; // Action to notify of changes in connected status.
void* received_action; // Action to notify of messages received.
void* received_action; // Action to notify of messages received.
struct lws_context* context; // The context.
int max_clients; // Maximum number of clients.
int* client_count; // Pointer to the client_count state variable.
bool running; // Indicator that the listening thread is running.
int max_clients; // Maximum number of clients.
int* client_count; // Pointer to the client_count state variable.
bool running; // Indicator that the listening thread is running.
web_socket_message_t* pending_messages; // Head of a list of pending messages to send.
lf_mutex_t* mutex; // Mutex for modifying this struct.
} server_status_t;

/**
Expand All @@ -80,17 +96,6 @@ preamble {=
bool connected;
} web_socket_instance_t;

/**
* A web socket string message together with its web socket instance.
* This needs a destructor and copy constructor because the message
* is assumed to be in allocated memory.
*/
typedef struct web_socket_message_t {
struct lws* wsi; // Web socket instance.
size_t length;
void* message;
} web_socket_message_t;

/** Destructor for an instance of web_socket_message_t. */
void web_socket_message_destructor(void* message);

Expand All @@ -100,7 +105,11 @@ preamble {=
#endif // WEBSOCKET_H
=}

reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
reactor WebSocketServer(
hostport: int = 8000,
max_clients: int = 0,
binary: bool = false
) {
output connected: web_socket_instance_t
output received: web_socket_message_t*

Expand All @@ -115,11 +124,15 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {

preamble {=
// Thread handling incoming messages.
// All lws calls except lws_cancel_service must occur in this thread.
void* websocket_thread(void* args) {
server_status_t* status = (server_status_t*)args;

while(status->running) {
// Invoke any pending callbacks.
// According to the docs, the timeout argument is ignored.
lws_service(status->context, 50);
// Nevertheless, set to 500ms.
lws_service(status->context, 500);
}
lws_context_destroy(status->context);
return NULL;
Expand All @@ -143,13 +156,17 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
break;
case LWS_CALLBACK_HTTP_CONFIRM_UPGRADE:

// NOTE: We do not need to lock status mutex to check and update client_count
// because it is only checked and updated in this websocket_thread.

// Check against maximum number of connections.
if (status->max_clients > 0 && *status->client_count >= status->max_clients) {
// Deny the connection.
lf_print_warning("**** Maximum number of clients reached. Denying connection.");
// Increment the client count past the maximum because it will be
// decremented when this closes and the browser will retry.
*status->client_count = *status->client_count + 1;

return 1;
}

Expand Down Expand Up @@ -178,7 +195,7 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
case LWS_CALLBACK_CLOSED:
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
lf_print_error("*** connection error.");

*status->client_count = *status->client_count - 1;

ws_instance.wsi = wsi;
Expand All @@ -197,6 +214,21 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
LF_PRINT_LOG("WS callback invoked with reason: %d", reason);
server_status_t* status = (server_status_t*)lws_context_user(lws_get_context(wsi));
switch(reason) {
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
// lws_service() blocking call interrupted. Check for messages to send.
if (lf_mutex_lock(status->mutex)) {
lf_print_error("Failed to lock mutex in WebSocketServer.");
return 0;
}

if (status->pending_messages) {
lws_callback_on_writable(status->pending_messages->wsi);
}

if (lf_mutex_unlock(status->mutex)) {
lf_print_error("Failed to unlock mutex in WebSocketServer.");
}
break;
case LWS_CALLBACK_RECEIVE:
if (len > 0) {
LF_PRINT_LOG("**** Server received WS message.");
Expand All @@ -210,11 +242,56 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
((char*)payload)[len] = '\0';
received->length = len;
received->message = payload;
received->next = NULL;
received->binary = true; // Treat all received data as binary.
// Carry the null terminator in the action payload, just in case.
lf_schedule_value(status->received_action, 0, received, len + 1);
}
break;

case LWS_CALLBACK_SERVER_WRITEABLE:
// Websocket has become writable. See whether there are pending
// messages to send. This requires locking the status mutex.
if (lf_mutex_lock(status->mutex)) {
lf_print_error("Failed to lock mutex in WebSocketServer.");
return 0;
}

web_socket_message_t* to_send = status->pending_messages;
if (to_send) {
// There is a message to send. Remove it from the list.
status->pending_messages = to_send->next;

// Send it.
int length = to_send->length;
// The buffer needs LWS_PRE bytes _before_ the message.
// Do not include the null terminator, because this makes JSON unable to parse it.
unsigned char buffer[LWS_PRE + length];
memcpy(&buffer[LWS_PRE], to_send->message, length);
int result;
if (to_send->binary) {
result = lws_write(to_send->wsi, &buffer[LWS_PRE], length, LWS_WRITE_BINARY);
} else {
result = lws_write(to_send->wsi, &buffer[LWS_PRE], length, LWS_WRITE_TEXT);
}
if (result < length) {
lf_print_warning("Send on web socket failed. Message send is incomplete.");
}
// Free the memory for the pending send.
web_socket_message_destructor(to_send);

// If there is another message, request another callback.
if (status->pending_messages) {
lws_callback_on_writable(status->pending_messages->wsi);
}
}

if (lf_mutex_unlock(status->mutex)) {
lf_print_error("Failed to unlock mutex in WebSocketServer.");
}

break;

// Do we need to handle LWS_CALLBACK_CLOSED?
// Seems to be handled in the HTTP callback.

Expand All @@ -230,13 +307,17 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
// Do not free the wsi.
}

// Argument and return type is web_socket_message_t*.
void* web_socket_message_copy_constructor(void* message) {
web_socket_message_t* cast = (web_socket_message_t*)message;
web_socket_message_t* result = (web_socket_message_t*)malloc(sizeof(web_socket_message_t));
size_t length = cast->length;
void* copy = malloc(length * sizeof(char));
result->message = memcpy(copy, cast->message, length);
result->wsi = cast->wsi;
result->length = length;
result->binary = cast->binary;
result->next = NULL;
return result;
}
=}
Expand All @@ -247,6 +328,7 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
lf_set_destructor(received_action, web_socket_message_destructor);
lf_set_copy_constructor(received_action, web_socket_message_copy_constructor);

// Assume the input is dynamically allocated, including its message field.
lf_set_destructor(send, web_socket_message_destructor);
lf_set_copy_constructor(send, web_socket_message_copy_constructor);

Expand All @@ -271,7 +353,13 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
};
// To get callbacks to be passed a pointer to the status struct:
info.user = &self->status;


// Callbacks will need to acquire a mutex to modify the status struct pending_messages field.
self->status.mutex = (lf_mutex_t*)calloc(1, sizeof(lf_mutex_t));
if (lf_mutex_init(self->status.mutex)) {
lf_print_error_and_exit("Failed to initialize mutex in WebSocketServer.");
}

self->status.context = lws_create_context(&info);
if (!self->status.context) {
lf_print_error_and_exit("Failed to create server for web sockets.");
Expand All @@ -282,10 +370,10 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {

self->status.max_clients = self->max_clients;
self->status.client_count = &self->client_count;
self->status.running = true;

lf_thread_t listener;
lf_thread_create(&listener, &websocket_thread, &self->status);
self->status.running = true;
=}

reaction(received_action) -> received {=
Expand All @@ -302,8 +390,8 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
=}

reaction(send) {=
// NOTE: This send must be before the reaction to connected_action
// because the latter could cause a disconnection.
// NOTE: This send cannot be before the reaction to connected_action
// because we will get a causality loop.
if(send->value->message == NULL) {
lf_print_error("Cannot send NULL message.");
} else if (send->value->wsi == NULL) {
Expand All @@ -317,6 +405,8 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
to_send.wsi = instance;
to_send.length = send->value->length;
to_send.message = send->value->message;
to_send.binary = send->value->binary;
to_send.next = NULL;
send_message(&to_send);
}
} else if (hashset_is_member(self->connected_instances, send->value->wsi)) {
Expand All @@ -327,16 +417,36 @@ reactor WebSocketServer(hostport: int = 8000, max_clients: int = 0) {
=}

method send_message(to_send: web_socket_message_t*): int {=
int length = to_send->length;
// The buffer needs LWS_PRE bytes _before_ the message.
// Do not include the null terminator, because this make JSON unable to parse it.
char buffer[LWS_PRE + length];
strncpy(buffer + LWS_PRE, to_send->message, length);
int result = lws_write(to_send->wsi, (unsigned char*)(buffer + LWS_PRE), length, LWS_WRITE_TEXT);
if (result < length) {
lf_print_warning("Send on web socket failed. Message send is incomplete.");
// Cannot actually send the message here for two reasons:
// 1. websocketlib is not thread safe, so the write needs to occur in the one thread making calls.
// 2. It is not safe to write until the socket is ready for a write.
// Hence, we append the message to list of pending messages and wait for a callback that
// the socket is ready for a write.

// Copy the message and append to the context list.
web_socket_message_t* copy = (web_socket_message_t*)web_socket_message_copy_constructor(to_send);

// Acquire the status mutex before putting on the pending messages list.
if (lf_mutex_lock(self->status.mutex)) {
lf_print_error("Failed to lock WebSocketServer mutex.");
return -1;
}

// Scroll to the end of the list of pending messages.
web_socket_message_t** existing = &(self->status.pending_messages);
while (*existing != NULL) {
existing = &((*existing)->next);
}
*existing = copy;

// Interrupt lws_service().
lws_cancel_service(self->status.context);

if (lf_mutex_unlock(self->status.mutex)) {
lf_print_error("Failed to unlock WebSocketServer mutex.");
return -1;
}
return result;
return 0;
=}

reaction(shutdown) {=
Expand Down

0 comments on commit cfda01a

Please sign in to comment.