Skip to content

Commit

Permalink
Pulse stream-parents (netdata#19445)
Browse files Browse the repository at this point in the history
* cleanup stream handshake reasons; introduce pulse-parents

* mark function params as not used

* more work towards pulse parents

* added 2 charts with status per node (inbound, outbound)

* log the reason the receiver left, when the sender is disconnected

* fix receiver exit reasons

* reject a duplicate streaming request with a different hostname

* log already connected on children

* do not retry too frequently

* fix log

* insist on connecting when the parent says already connected, but it is not in the stream path

* fix last commit

* log already connected on parent

* added streaming events charts

* streaming events are available in extended pulse

* archived to stale

* unify aclk connection status and connection failures/disconnection reasons
  • Loading branch information
ktsaou authored Jan 22, 2025
1 parent 7bc0413 commit 14dafed
Show file tree
Hide file tree
Showing 41 changed files with 1,478 additions and 685 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,8 @@ set(DAEMON_FILES
src/daemon/pulse/pulse-network.h
src/daemon/pulse/pulse-db-dbengine-retention.c
src/daemon/pulse/pulse-db-dbengine-retention.h
src/daemon/pulse/pulse-parents.c
src/daemon/pulse/pulse-parents.h
)

set(H2O_FILES
Expand Down
271 changes: 144 additions & 127 deletions src/aclk/aclk.c

Large diffs are not rendered by default.

34 changes: 23 additions & 11 deletions src/aclk/aclk.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "aclk_util.h"
//#include "aclk_rrdhost_state.h"

#include "https_client.h"

// How many MQTT PUBACKs we need to get to consider connection
// stable for the purposes of TBEB (truncated binary exponential backoff)
#define ACLK_PUBACKS_CONN_STABLE 3
Expand All @@ -18,26 +20,35 @@ typedef enum {
ACLK_PING_TIMEOUT = 3
} ACLK_DISCONNECT_ACTION;

typedef enum __attribute__((packed)) {
typedef enum {
ACLK_STATUS_CONNECTED = 0,
ACLK_STATUS_NONE,

// ND_SOCK_ERR_XXX is included here
// HTTPS_CLIENT_RESP_XXX is included here

ACLK_STATUS_OFFLINE = HTTPS_CLIENT_RESP_MAX,
ACLK_STATUS_DISABLED,
ACLK_STATUS_NO_CLOUD_URL,
ACLK_STATUS_INVALID_CLOUD_URL,
ACLK_STATUS_NOT_CLAIMED,
ACLK_STATUS_ENV_ENDPOINT_UNREACHABLE,
ACLK_STATUS_ENV_RESPONSE_NOT_200,
ACLK_STATUS_ENV_RESPONSE_EMPTY,
ACLK_STATUS_ENV_RESPONSE_NOT_JSON,
ACLK_STATUS_ENV_FAILED,
ACLK_STATUS_CANT_CONNECT_NO_CLOUD_URL,
ACLK_STATUS_CANT_CONNECT_INVALID_CLOUD_URL,
ACLK_STATUS_BLOCKED,
ACLK_STATUS_NO_OLD_PROTOCOL,
ACLK_STATUS_NO_PROTOCOL_CAPABILITY,
ACLK_STATUS_INVALID_ENV_AUTH_URL,
ACLK_STATUS_INVALID_ENV_TRANSPORT_IDX,
ACLK_STATUS_INVALID_ENV_TRANSPORT_URL,
ACLK_STATUS_INVALID_OTP,
ACLK_STATUS_NO_LWT_TOPIC,

// disconnection reasons
ACLK_STATUS_OFFLINE_CLOUD_REQUESTED_DISCONNECT,
ACLK_STATUS_OFFLINE_PING_TIMEOUT,
ACLK_STATUS_OFFLINE_RELOADING_CONFIG,
ACLK_STATUS_OFFLINE_POLL_ERROR,
ACLK_STATUS_OFFLINE_CLOSED_BY_REMOTE,
ACLK_STATUS_OFFLINE_SOCKET_ERROR,
ACLK_STATUS_OFFLINE_MQTT_PROTOCOL_ERROR,
ACLK_STATUS_OFFLINE_WS_PROTOCOL_ERROR,
ACLK_STATUS_OFFLINE_MESSAGE_TOO_BIG,

} ACLK_STATUS;

extern ACLK_STATUS aclk_status;
Expand Down Expand Up @@ -95,5 +106,6 @@ void add_aclk_host_labels(void);
void aclk_queue_node_info(RRDHOST *host, bool immediate);

struct mqtt_wss_stats aclk_statistics(void);
void aclk_status_set(ACLK_STATUS status);

#endif /* ACLK_H */
207 changes: 110 additions & 97 deletions src/aclk/aclk_otp.c

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/aclk/aclk_otp.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
#include "aclk_util.h"

#if OPENSSL_VERSION_NUMBER >= OPENSSL_VERSION_300
int aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target, bool *fallback_ipv4);
https_client_resp_t aclk_get_mqtt_otp(EVP_PKEY *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target, bool *fallback_ipv4);
#else
int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target, bool *fallback_ipv4);
https_client_resp_t aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target, bool *fallback_ipv4);
#endif
int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port, bool *fallback_ipv4);
https_client_resp_t aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port, bool *fallback_ipv4);

#endif /* ACLK_OTP_H */
Loading

0 comments on commit 14dafed

Please sign in to comment.