Skip to content

Commit d76c80a

Browse files
authored
Localised per-thread conn data for a clean disconnect (#429)
- updated thread notify for fast message sending - fixed the issue in which a message send could timeout due to a wait in the processing thread - clearer message log - localized per-thread connection data for a clean state Signed-off-by: AssemblyJohn <ioan.bogdann@gmail.com>
1 parent aae98f5 commit d76c80a

File tree

2 files changed

+74
-29
lines changed

2 files changed

+74
-29
lines changed

lib/ocpp/common/websocket/websocket_tls_tpm.cpp

+72-28
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,19 @@ enum class EConnectionState {
4646
FINALIZED, ///< We finalized the connection and we're never going to connect again
4747
};
4848

49+
/// \brief Message to return in the callback to close the socket connection
50+
static constexpr int LWS_CLOSE_SOCKET_RESPONSE_MESSAGE = -1;
51+
4952
/// \brief Per thread connection data
5053
struct ConnectionData {
51-
ConnectionData() : is_running(true), state(EConnectionState::INITIALIZE), wsi(nullptr) {
54+
ConnectionData() : is_running(true), state(EConnectionState::INITIALIZE), wsi(nullptr), owner(nullptr) {
5255
}
5356

5457
~ConnectionData() {
5558
state = EConnectionState::FINALIZED;
5659
is_running = false;
5760
wsi = nullptr;
61+
owner = nullptr;
5862
}
5963

6064
void bind_thread(std::thread::id id) {
@@ -94,14 +98,16 @@ struct ConnectionData {
9498
}
9599

96100
public:
97-
// openssl context
101+
// Openssl context, must be destroyed in this order
98102
std::unique_ptr<SSL_CTX> sec_context;
99103
std::unique_ptr<OSSL_LIB_CTX> sec_lib_context;
100104

101105
// libwebsockets state
102106
std::unique_ptr<lws_context> lws_ctx;
103107
lws* wsi;
104108

109+
WebsocketTlsTPM* owner;
110+
105111
private:
106112
std::thread::id lws_thread_id;
107113
bool is_running;
@@ -170,10 +176,10 @@ void WebsocketTlsTPM::set_connection_options(const WebsocketConnectionOptions& c
170176
}
171177

172178
static int callback_minimal(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) {
173-
// Get user safely, since on some callbacks (void *user) can be different
179+
// Get user safely, since on some callbacks (void *user) can be different than what we set
174180
if (wsi != nullptr) {
175-
if (WebsocketTlsTPM* websocket = reinterpret_cast<WebsocketTlsTPM*>(lws_wsi_user(wsi))) {
176-
return websocket->process_callback(wsi, static_cast<int>(reason), user, in, len);
181+
if (ConnectionData* data = reinterpret_cast<ConnectionData*>(lws_wsi_user(wsi))) {
182+
return data->owner->process_callback(wsi, static_cast<int>(reason), user, in, len);
177183
}
178184
}
179185

@@ -314,7 +320,8 @@ void WebsocketTlsTPM::recv_loop() {
314320
EVLOG_debug << "Init recv loop with ID: " << std::this_thread::get_id();
315321

316322
while (false == data->is_interupted()) {
317-
if (false == recv_message_queue.empty()) {
323+
// Process all messages
324+
while (false == recv_message_queue.empty()) {
318325
std::string message{};
319326

320327
{
@@ -326,9 +333,12 @@ void WebsocketTlsTPM::recv_loop() {
326333
// Invoke our processing callback, that might trigger a send back that
327334
// can cause a deadlock if is not managed on a different thread
328335
this->message_callback(message);
329-
} else {
336+
}
337+
338+
// While we are empty, sleep
339+
{
330340
std::unique_lock<std::mutex> lock(this->recv_mutex);
331-
recv_message_cv.wait_for(lock, std::chrono::seconds(10),
341+
recv_message_cv.wait_for(lock, std::chrono::seconds(1),
332342
[&]() { return (false == recv_message_queue.empty()); });
333343
}
334344
}
@@ -358,7 +368,9 @@ void WebsocketTlsTPM::client_loop() {
358368
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
359369
info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
360370
info.protocols = protocols;
361-
info.user = this;
371+
372+
// Set reference to ConnectionData since 'data' can go away in the websocket
373+
info.user = data;
362374

363375
info.fd_limit_per_thread = 1 + 1 + 1;
364376

@@ -413,7 +425,7 @@ void WebsocketTlsTPM::client_loop() {
413425
i.protocol = strdup(conversions::ocpp_protocol_version_to_string(this->connection_options.ocpp_version).c_str());
414426
i.local_protocol_name = local_protocol_name;
415427
i.pwsi = &conn_data->wsi;
416-
i.userdata = this;
428+
i.userdata = data; // See lws_context 'user'
417429

418430
// TODO (ioan): See if we need retry policy since we handle this manually
419431
// i.retry_and_idle_policy = &retry;
@@ -436,6 +448,7 @@ void WebsocketTlsTPM::client_loop() {
436448
int n = 0;
437449

438450
while (n >= 0 && (false == data->is_interupted())) {
451+
// Set to -1 for continuous servicing, of required, not recommended
439452
n = lws_service(data->get_ctx(), 0);
440453

441454
if (false == message_queue.empty()) {
@@ -462,6 +475,8 @@ bool WebsocketTlsTPM::connect() {
462475
}
463476

464477
auto conn_data = new ConnectionData();
478+
conn_data->owner = this;
479+
465480
this->conn_data.reset(conn_data);
466481

467482
// Wait old thread for a clean state
@@ -563,7 +578,6 @@ void WebsocketTlsTPM::close(websocketpp::close::status::value code, const std::s
563578

564579
if (conn_data) {
565580
if (auto* data = conn_data.get()) {
566-
// lws_close_reason(data->get_conn(), LWS_CLOSE_STATUS_NORMAL, NULL, 0);
567581
data->do_interrupt();
568582
}
569583

@@ -693,8 +707,8 @@ void WebsocketTlsTPM::on_writable() {
693707
return;
694708
}
695709

696-
if (data->get_state() == EConnectionState::FINALIZED) {
697-
EVLOG_error << "Trying to write message to finalized state!";
710+
if (data->is_interupted() || data->get_state() == EConnectionState::FINALIZED) {
711+
EVLOG_error << "Trying to write message to interrupted/finalized state!";
698712
return;
699713
}
700714

@@ -712,7 +726,7 @@ void WebsocketTlsTPM::on_writable() {
712726

713727
// Pop all sent messages
714728
if (message->sent_bytes >= message->payload.length()) {
715-
EVLOG_debug << "Message fully written, popping from queue!";
729+
EVLOG_info << "Websocket message fully written, popping processing thread from queue!";
716730

717731
// If we have written all bytes to libwebsockets it means that if we received
718732
// this writable callback everything is sent over the wire, mark the message
@@ -723,6 +737,7 @@ void WebsocketTlsTPM::on_writable() {
723737
message_queue.pop();
724738
}
725739

740+
EVLOG_debug << "Notifying waiting thread!";
726741
// Notify any waiting thread to check it's state
727742
msg_send_cv.notify_one();
728743
} else {
@@ -745,9 +760,13 @@ void WebsocketTlsTPM::on_writable() {
745760
void WebsocketTlsTPM::request_write() {
746761
if (this->m_is_connected) {
747762
if (auto* data = conn_data.get()) {
748-
if (data->get_conn())
749-
lws_callback_on_writable(data->get_conn());
763+
if (data->get_conn()) {
764+
// Notify waiting processing thread to wake up
765+
lws_cancel_service(data->get_ctx());
766+
}
750767
}
768+
} else {
769+
EVLOG_warning << "Requested write with offline TLS websocket!";
751770
}
752771
}
753772

@@ -772,7 +791,7 @@ void WebsocketTlsTPM::poll_message(const std::shared_ptr<WebsocketMessage>& msg,
772791
}
773792

774793
if (msg->message_sent)
775-
EVLOG_debug << "Successfully sent last message over TLS websocket!";
794+
EVLOG_info << "Successfully sent last message over TLS websocket!";
776795
else
777796
EVLOG_warning << "Could not send last message over TLS websocket!";
778797
}
@@ -805,14 +824,27 @@ void WebsocketTlsTPM::ping() {
805824
}
806825

807826
int WebsocketTlsTPM::process_callback(void* wsi_ptr, int callback_reason, void* user, void* in, size_t len) {
827+
enum lws_callback_reasons reason = static_cast<lws_callback_reasons>(callback_reason);
828+
808829
lws* wsi = reinterpret_cast<lws*>(wsi_ptr);
809830

810-
enum lws_callback_reasons reason = static_cast<lws_callback_reasons>(callback_reason);
811-
ConnectionData* data = this->conn_data.get();
831+
// The ConnectionData is thread bound, so that if we clear it in the 'WebsocketTlsTPM'
832+
// we still have a chance to close the connection here
833+
ConnectionData* data = reinterpret_cast<ConnectionData*>(lws_wsi_user(wsi));
812834

813835
// If we are in the process of deletion, just close socket and return
814-
if (nullptr == data)
815-
return -1;
836+
if (nullptr == data) {
837+
return LWS_CLOSE_SOCKET_RESPONSE_MESSAGE;
838+
}
839+
840+
// If we are interrupted, close the socket cleanly
841+
if (data->is_interupted()) {
842+
EVLOG_info << "Conn interrupted/closed, closing socket!";
843+
844+
// Set the normal reason if we are interrupted
845+
lws_close_reason(data->get_conn(), LWS_CLOSE_STATUS_NORMAL, NULL, 0);
846+
return LWS_CLOSE_SOCKET_RESPONSE_MESSAGE;
847+
}
816848

817849
switch (reason) {
818850
// TODO: If required in the future
@@ -903,26 +935,38 @@ int WebsocketTlsTPM::process_callback(void* wsi_ptr, int callback_reason, void*
903935
case LWS_CALLBACK_CLIENT_WRITEABLE:
904936
on_writable();
905937

906-
if (false == message_queue.empty())
938+
if (false == message_queue.empty()) {
907939
lws_callback_on_writable(wsi);
940+
}
941+
break;
942+
943+
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
944+
if (false == message_queue.empty()) {
945+
lws_callback_on_writable(data->get_conn());
946+
}
908947
break;
909948

910949
case LWS_CALLBACK_CLIENT_RECEIVE:
911950
on_message(in, len);
951+
952+
if (false == message_queue.empty()) {
953+
lws_callback_on_writable(data->get_conn());
954+
}
955+
break;
956+
957+
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
958+
if (false == message_queue.empty()) {
959+
lws_callback_on_writable(data->get_conn());
960+
}
912961
break;
913962

914963
default:
915964
EVLOG_info << "Callback with unhandled reason: " << reason;
916965
break;
917966
}
918967

919-
if (data->is_interupted()) {
920-
EVLOG_info << "Conn interrupted, closing socket!";
921-
return -1;
922-
}
923-
924968
// Return -1 on fatal error (-1 is request to close the socket)
925969
return 0;
926970
}
927971

928-
} // namespace ocpp
972+
} // namespace ocpp

lib/ocpp/v16/charge_point_impl.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ WebsocketConnectionOptions ChargePointImpl::get_ws_connection_options() {
240240
this->configuration->getUseSslDefaultVerifyPaths(),
241241
this->configuration->getAdditionalRootCertificateCheck(),
242242
this->configuration->getHostName(),
243-
this->configuration->getVerifyCsmsCommonName()};
243+
this->configuration->getVerifyCsmsCommonName(),
244+
this->configuration->getUseTPM()};
244245
return connection_options;
245246
}
246247

0 commit comments

Comments
 (0)