Skip to content

Commit

Permalink
Code cleanup on ACLK messages (netdata#19566)
Browse files Browse the repository at this point in the history
* Store localhost information immediately

* Fix return code on failure
Use sqlite3_step_monitored

* Remove get_node_list

* Schedule node_state_update after 1 second
Code cleanup

* Code dedup
aclk_create_node_instance_job to create a new node
aclk_update_node_instance_job tp update node status
aclk_send_node_instances uses in-memory hosts
  • Loading branch information
stelfrag authored Feb 4, 2025
1 parent 5984418 commit dd7861a
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 253 deletions.
191 changes: 66 additions & 125 deletions src/aclk/aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -932,153 +932,94 @@ bool aclk_host_state_update_auto(RRDHOST *host) {
return true;
}

void aclk_host_state_update(RRDHOST *host, int cmd, int queryable)
void aclk_create_node_instance_job(RRDHOST *host)
{
ND_UUID node_id;
if (unlikely(!host))
return;

if (!aclk_online())
CLAIM_ID claim_id = claim_id_get();
if (!claim_id_is_set(claim_id))
return;

if (!UUIDiszero(host->node_id)) {
node_id = host->node_id;
}
else {
int ret = get_node_id(&host->host_id.uuid, &node_id.uuid);
if (ret > 0) {
// this means we were not able to check if node_id already present
netdata_log_error("ACLK: Unable to check for node_id. Ignoring the host state update.");
return;
}
if (ret < 0) {
// node_id not found
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
CLAIM_ID claim_id = claim_id_get();

node_instance_creation_t node_instance_creation = {
.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL,
.hops = rrdhost_ingestion_hops(host),
.hostname = rrdhost_hostname(host),
.machine_guid = host->machine_guid};

create_query->data.bin_payload.payload =
generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);

create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";
nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Registering host=%s, hops=%d", host->machine_guid,
rrdhost_ingestion_hops(host));
aclk_query_t query = aclk_query_new(REGISTER_NODE);
int32_t hops = rrdhost_ingestion_hops(host);
node_instance_creation_t node_instance_creation = {
.hops = hops,
.hostname = rrdhost_hostname(host),
.machine_guid = host->machine_guid,
.claim_id = claim_id.str
};

aclk_add_job(create_query);
return;
}
}
query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
query->data.bin_payload.msg_name = "CreateNodeInstance";
query->data.bin_payload.payload = generate_node_instance_creation(&query->data.bin_payload.size, &node_instance_creation);

nd_log_daemon(NDLP_DEBUG, "Queuing registration for host=%s, hops=%d", host->machine_guid, hops);

aclk_add_job(query);
}

void aclk_update_node_instance_job(RRDHOST *host, int live, int queryable)
{
if (unlikely(!host))
return;

CLAIM_ID claim_id = claim_id_get();
if (!claim_id_is_set(claim_id))
return;

aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);

int32_t hops = rrdhost_ingestion_hops(host);
node_instance_connection_t node_state_update = {
.hops = rrdhost_ingestion_hops(host),
.live = cmd,
.claim_id = claim_id.str,
.hops = hops,
.live = live,
.queryable = queryable,
.session_id = aclk_session_newarch
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(node_id.uuid, (char*)node_state_update.node_id);
.session_id = aclk_session_newarch};

node_state_update.capabilities = aclk_get_agent_capas();
char node_id[UUID_STR_LEN];
uuid_unparse_lower(host->node_id.uuid, node_id);

CLAIM_ID claim_id = claim_id_get();
node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
node_state_update.node_id = node_id;
node_state_update.capabilities = aclk_get_node_instance_capas(host);

query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);

nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing status update for node=%s, live=%d, hops=%d, queryable=%d",
(char*)node_state_update.node_id, cmd,
rrdhost_ingestion_hops(host), queryable);
nd_log_daemon(
NDLP_DEBUG,
"Queuing status update for node=%s, live=%d, hops=%d, queryable=%d",
(char *)node_state_update.node_id,
live,
hops,
queryable);

freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
freez((void *)node_state_update.capabilities);
aclk_add_job(query);
}

void aclk_send_node_instances(mqtt_wss_client client)
void aclk_host_state_update(RRDHOST *host, int live, int queryable)
{
struct node_instance_list *list_head = get_node_list();
struct node_instance_list *list = list_head;
if (unlikely(!list)) {
error_report("Failure to get_node_list from DB!");
sleep_usec(USEC_PER_SEC);
aclk_query_t query = aclk_query_new(SEND_NODE_INSTANCES);
aclk_add_job(query);
if (!aclk_online())
return;
}
while (!uuid_is_null(list->host_id)) {
if (!uuid_is_null(list->node_id)) {
aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
node_instance_connection_t node_state_update = {
.live = list->live,
.hops = list->hops,
.queryable = 1,
.session_id = aclk_session_newarch
};
node_state_update.node_id = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->node_id, (char*)node_state_update.node_id);

char host_id[UUID_STR_LEN];
uuid_unparse_lower(list->host_id, host_id);

RRDHOST *host = rrdhost_find_by_guid(host_id);
if (unlikely(!host)) {
freez((void*)node_state_update.node_id);
freez(query);
continue;
}
node_state_update.capabilities = aclk_get_node_instance_capas(host);

CLAIM_ID claim_id = claim_id_get();
node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);

nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing status update for node=%s, live=%d, hops=%d, queryable=1",
(char*)node_state_update.node_id, list->live, list->hops);

freez((void*)node_state_update.capabilities);
freez((void*)node_state_update.node_id);
query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
send_bin_msg(client, query);
aclk_query_free(query);
} else {
aclk_query_t create_query;
create_query = aclk_query_new(REGISTER_NODE);
node_instance_creation_t node_instance_creation = {
.hops = list->hops,
.hostname = list->hostname,
};
node_instance_creation.machine_guid = mallocz(UUID_STR_LEN);
uuid_unparse_lower(list->host_id, (char*)node_instance_creation.machine_guid);
create_query->data.bin_payload.topic = ACLK_TOPICID_CREATE_NODE;
create_query->data.bin_payload.msg_name = "CreateNodeInstance";

CLAIM_ID claim_id = claim_id_get();
node_instance_creation.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL,
create_query->data.bin_payload.payload = generate_node_instance_creation(&create_query->data.bin_payload.size, &node_instance_creation);

nd_log(NDLS_DAEMON, NDLP_DEBUG,
"Queuing registration for host=%s, hops=%d",
(char*)node_instance_creation.machine_guid, list->hops);

freez((void *)node_instance_creation.machine_guid);
send_bin_msg(client, create_query);
aclk_query_free(create_query);
}
freez(list->hostname);
if (uuid_is_null(host->node_id.uuid))
aclk_create_node_instance_job(host);
else
aclk_update_node_instance_job(host, live, queryable);
}

list++;
void aclk_send_node_instances()
{
RRDHOST *host;
dfe_start_reentrant(rrdhost_root_index, host)
{
int live = rrdhost_ingestion_status(host) == RRDHOST_INGEST_STATUS_ONLINE ? 1 : 0;
aclk_host_state_update(host, live, 1);
}
freez(list_head);
dfe_done(host);
}

void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
Expand Down
4 changes: 2 additions & 2 deletions src/aclk/aclk.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;

void aclk_host_state_update(RRDHOST *host, int cmd, int queryable);
void aclk_host_state_update(RRDHOST *host, int live, int queryable);
bool aclk_host_state_update_auto(RRDHOST *host);

void aclk_send_node_instances(mqtt_wss_client client);
void aclk_send_node_instances();

void aclk_send_bin_msg(char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname);

Expand Down
6 changes: 6 additions & 0 deletions src/database/rrdhost.c
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ RRDHOST *rrdhost_create(

if(!archived) {
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
if (is_localhost) {
BUFFER *buf = buffer_create(0, NULL);
size_t query_counter = 0;
store_host_info_and_metadata(host, buf, &query_counter);
buffer_free(buf);
}
rrdhost_load_rrdcontext_data(host);
ml_host_new(host);
} else
Expand Down
33 changes: 4 additions & 29 deletions src/database/sqlite/sqlite_aclk.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ void sanity_check(void) {
#include "../aclk_query.h"
#include "../aclk_capas.h"

static void create_node_instance_result_job(mqtt_wss_client client __maybe_unused, const char *machine_guid, const char *node_id)
static void create_node_instance_result_job(const char *machine_guid, const char *node_id)
{
nd_uuid_t host_uuid, node_uuid;

Expand All @@ -33,32 +33,7 @@ static void create_node_instance_result_job(mqtt_wss_client client __maybe_unuse
return;
}
sql_update_node_id(&host_uuid, &node_uuid);
schedule_node_state_update(host, 5000);
//
// aclk_query_t query = aclk_query_new(NODE_STATE_UPDATE);
// node_instance_connection_t node_state_update = {
// .hops = 1,
// .live = 0,
// .queryable = 1,
// .session_id = aclk_session_newarch,
// .node_id = node_id,
// .capabilities = NULL};
//
// node_state_update.live = rrdhost_is_local(host) ? 1 : 0;
// node_state_update.hops = rrdhost_ingestion_hops(host);
// node_state_update.capabilities = aclk_get_node_instance_capas(host);
// schedule_node_state_update(host, 5000);
//
// CLAIM_ID claim_id = claim_id_get();
// node_state_update.claim_id = claim_id_is_set(claim_id) ? claim_id.str : NULL;
// query->data.bin_payload.payload = generate_node_instance_connection(&query->data.bin_payload.size, &node_state_update);
//
// freez((void *)node_state_update.capabilities);
//
// query->data.bin_payload.msg_name = "UpdateNodeInstanceConnection";
// query->data.bin_payload.topic = ACLK_TOPICID_NODE_CONN;
//
// aclk_add_job(query);
schedule_node_state_update(host, 1000);
}

struct aclk_sync_config_s {
Expand Down Expand Up @@ -395,7 +370,7 @@ static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query
break;
case SEND_NODE_INSTANCES:
worker_is_busy(UV_EVENT_SEND_NODE_INSTANCES);
aclk_send_node_instances(config->client);
aclk_send_node_instances();
ok_to_send = false;
break;
case ALERT_START_STREAMING:
Expand All @@ -410,7 +385,7 @@ static void aclk_run_query(struct aclk_sync_config_s *config, aclk_query_t query
break;
case CREATE_NODE_INSTANCE:
worker_is_busy(UV_EVENT_CREATE_NODE_INSTANCE);
create_node_instance_result_job(config->client, query->machine_guid, query->data.node_id);
create_node_instance_result_job(query->machine_guid, query->data.node_id);
ok_to_send = false;
break;

Expand Down
Loading

0 comments on commit dd7861a

Please sign in to comment.