Skip to content

Commit

Permalink
fix crash when cleaning up virtual nodes (netdata#19467)
Browse files Browse the repository at this point in the history
* when the receiver is null, there is no need to wait for anything

* virtual nodes may be activated and reactivated
  • Loading branch information
ktsaou authored Jan 23, 2025
1 parent 2dc54fa commit 064bb4c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
16 changes: 16 additions & 0 deletions src/libnetdata/object-state/object-state.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ void object_state_activate(OBJECT_STATE *os) {
&os->state_refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
}

void object_state_activate_if_not_activated(OBJECT_STATE *os) {
__atomic_add_fetch(&os->state_id, 1, __ATOMIC_RELAXED);

REFCOUNT expected = __atomic_load_n(&os->state_refcount, __ATOMIC_RELAXED);
REFCOUNT desired;

do {
if(expected != OBJECT_STATE_DEACTIVATED)
return;

desired = 0;

} while(!__atomic_compare_exchange_n(
&os->state_refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
}

void object_state_deactivate(OBJECT_STATE *os) {
__atomic_add_fetch(&os->state_id, 1, __ATOMIC_RELAXED);

Expand Down
1 change: 1 addition & 0 deletions src/libnetdata/object-state/object-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ OBJECT_STATE_ID object_state_id(OBJECT_STATE *os);
// increments the object's state id
// enables using the object - users may acquire and release the object
void object_state_activate(OBJECT_STATE *os);
void object_state_activate_if_not_activated(OBJECT_STATE *os);

// increments the object's state id
// prevents users from acquiring it, and waits until all of its holders have released it
Expand Down
2 changes: 1 addition & 1 deletion src/plugins.d/pluginsd_parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si

rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
rrdhost_flag_set(host, RRDHOST_FLAG_COLLECTOR_ONLINE);
object_state_activate(&host->state_id);
object_state_activate_if_not_activated(&host->state_id);
ml_host_start(host);
dyncfg_host_init(host);
pulse_host_status(host, 0, 0); // this will detect the receiver status
Expand Down
28 changes: 15 additions & 13 deletions src/streaming/stream-receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -1224,23 +1224,25 @@ bool stream_receiver_signal_to_stop_and_wait(RRDHOST *host, STREAM_HANDSHAKE rea
__atomic_store_n(&rpt->exit.shutdown, true, __ATOMIC_RELEASE);
shutdown(rpt->sock.fd, SHUT_RDWR);
}
}

int count = 2000;
while (host->receiver == rpt && count-- > 0) {
rrdhost_receiver_unlock(host);
int count = 2000;
while (host->receiver == rpt && count-- > 0) {
rrdhost_receiver_unlock(host);

// let the lock for the receiver thread to exit
sleep_usec(1 * USEC_PER_MS);
// let the lock for the receiver thread to exit
sleep_usec(1 * USEC_PER_MS);

rrdhost_receiver_lock(host);
}
rrdhost_receiver_lock(host);
}

if(host->receiver == rpt)
netdata_log_error("STREAM RCV[x] '%s' [from [%s]:%s]: "
"streaming thread takes too long to stop, giving up..."
, rrdhost_hostname(host)
, rpt->remote_ip, rpt->remote_port);
if(host->receiver == rpt)
netdata_log_error("STREAM RCV[x] '%s' [from [%s]:%s]: "
"streaming thread takes too long to stop, giving up..."
, rrdhost_hostname(host)
, rpt->remote_ip, rpt->remote_port);
else
ret = true;
}
else
ret = true;

Expand Down

0 comments on commit 064bb4c

Please sign in to comment.