Skip to content

Commit

Permalink
Verify and cleanup deleted contexts (netdata#19389)
Browse files Browse the repository at this point in the history
* Verify and cleanup deleted contexts

* Remove context from cleanup list immediately when processed

* Throttle cleanup if wal size is getting big

* Add context check creation time
  • Loading branch information
stelfrag authored Jan 14, 2025
1 parent 592ee98 commit 7553d19
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 9 deletions.
78 changes: 77 additions & 1 deletion src/database/sqlite/sqlite_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ const char *database_context_config[] = {
"last_time_t INT NOT NULL, deleted INT NOT NULL, "
"family TEXT, PRIMARY KEY (host_id, id))",

"CREATE TABLE IF NOT EXISTS context_metadata_cleanup (id INTEGER PRIMARY KEY, host_id BLOB, context TEXT NOT NULL, date_created INT, "
"UNIQUE (host_id, context))",

"CREATE TRIGGER IF NOT EXISTS del_context1 AFTER DELETE ON context "
"BEGIN INSERT INTO context_metadata_cleanup (host_id, context, date_created) "
"VALUES (old.host_id, old.id, UNIXEPOCH()) ON CONFLICT DO UPDATE SET date_created = excluded.date_created; END",

NULL
};

Expand Down Expand Up @@ -261,8 +268,77 @@ int ctx_store_context(nd_uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data
return (rc_stored != SQLITE_DONE);
}

// Delete a context
#define CTX_DELETE_CONTEXT_META_CLEANUP_ITEM "DELETE FROM context_metadata_cleanup WHERE host_id = @host_id AND context = @context"

void ctx_delete_metadata_cleanup_context(sqlite3_stmt **res, nd_uuid_t(*host_uuid), const char *context)
{
if (!*res) {
if (!PREPARE_STATEMENT(db_context_meta, CTX_DELETE_CONTEXT_META_CLEANUP_ITEM, res))
return;
}

int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(*res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC));
SQLITE_BIND_FAIL(done, sqlite3_bind_text(*res, ++param, context, -1, SQLITE_STATIC));

param = 0;
int rc = sqlite3_step_monitored(*res);
if (rc != SQLITE_DONE)
error_report("Failed to delete context check entry, rc = %d", rc);

done:
REPORT_BIND_FAIL(*res, param);
SQLITE_RESET(*res);
}

// Schedule context cleanup for host
#define CTX_GET_CONTEXT_META_CLEANUP_LIST "SELECT context FROM context_metadata_cleanup WHERE host_id = @host_id"

void ctx_get_context_list_to_cleanup(nd_uuid_t *host_uuid, void (*cleanup_cb)(Pvoid_t JudyL, void *data), void *data)
{
if (unlikely(!host_uuid))
return;

sqlite3_stmt *res = NULL;

if (!PREPARE_STATEMENT(db_context_meta, CTX_GET_CONTEXT_META_CLEANUP_LIST, &res))
return;

int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, host_uuid, sizeof(*host_uuid), SQLITE_STATIC));
param = 0;

const char *context;
Pvoid_t CTX_JudyL = NULL;
Pvoid_t *Pvalue;
while (sqlite3_step_monitored(res) == SQLITE_ROW) {
context = (char *) sqlite3_column_text(res, 0);
STRING *ctx = string_strdupz(context);
Pvalue = JudyLIns(&CTX_JudyL, (Word_t) ctx, PJE0);
if (*Pvalue)
string_freez(ctx);
else
*(int *)Pvalue = 1;
}

if (CTX_JudyL) {
cleanup_cb(CTX_JudyL, data);

bool first = true;
Word_t Index = 0;
while ((Pvalue = JudyLFirstThenNext(CTX_JudyL, &Index, &first))) {
STRING *ctx = (STRING *) Index;
string_freez(ctx);
}
}
(void)JudyLFreeArray(&CTX_JudyL, PJE0);

done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}

// Delete a context
#define CTX_DELETE_CONTEXT "DELETE FROM context WHERE host_id = @host_id AND id = @context"
int ctx_delete_context(nd_uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data)
{
Expand Down
3 changes: 3 additions & 0 deletions src/database/sqlite/sqlite_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ void ctx_get_dimension_list(nd_uuid_t *host_uuid, void (*dict_cb)(SQL_DIMENSION_

int ctx_store_context(nd_uuid_t *host_uuid, VERSIONED_CONTEXT_DATA *context_data);

void ctx_get_context_list_to_cleanup(nd_uuid_t *host_uuid, void (*cleanup_cb)(Pvoid_t context, void *data), void *data);
void ctx_delete_metadata_cleanup_context(sqlite3_stmt **context_res, nd_uuid_t(*host_uuid), const char *context);

#define ctx_update_context(host_uuid, context_data) ctx_store_context(host_uuid, context_data)

int ctx_delete_context(nd_uuid_t *host_id, VERSIONED_CONTEXT_DATA *context_data);
Expand Down
9 changes: 6 additions & 3 deletions src/database/sqlite/sqlite_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#define MAX_PREPARED_STATEMENTS (32)
pthread_key_t key_pool[MAX_PREPARED_STATEMENTS];

long long def_journal_size_limit = 16777216;

SQLITE_API int sqlite3_exec_monitored(
sqlite3 *db, /* An open database */
const char *sql, /* SQL to be evaluated */
Expand Down Expand Up @@ -84,7 +86,6 @@ int configure_sqlite_database(sqlite3 *database, int target_version, const char
const char *def_synchronous = "NORMAL";
const char *def_journal_mode = "WAL";
const char *def_temp_store = "MEMORY";
long long def_journal_size_limit = 16777216;
long long def_cache_size = -2000;

// https://www.sqlite.org/pragma.html#pragma_auto_vacuum
Expand Down Expand Up @@ -122,8 +123,10 @@ int configure_sqlite_database(sqlite3 *database, int target_version, const char
// https://www.sqlite.org/pragma.html#pragma_journal_size_limit
// PRAGMA schema.journal_size_limit = N ;
snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_size_limit=%lld", def_journal_size_limit);
if (config_exists(CONFIG_SECTION_SQLITE, "journal size limit"))
snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_size_limit=%lld", config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", def_journal_size_limit));
if (config_exists(CONFIG_SECTION_SQLITE, "journal size limit")) {
def_journal_size_limit = config_get_number(CONFIG_SECTION_SQLITE, "journal size limit", def_journal_size_limit);
snprintfz(buf, sizeof(buf) - 1, "PRAGMA journal_size_limit=%lld", def_journal_size_limit);
}
if (init_database_batch(database, list, description))
return 1;

Expand Down
172 changes: 167 additions & 5 deletions src/database/sqlite/sqlite_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#define DB_METADATA_VERSION 18

extern long long def_journal_size_limit;

const char *database_config[] = {
"CREATE TABLE IF NOT EXISTS host(host_id BLOB PRIMARY KEY, hostname TEXT NOT NULL, "
"registry_hostname TEXT NOT NULL default 'unknown', update_every INT NOT NULL default 1, "
Expand Down Expand Up @@ -184,7 +186,6 @@ sqlite3 *db_meta = NULL;

enum metadata_opcode {
METADATA_DATABASE_NOOP = 0,
METADATA_DATABASE_TIMER,
METADATA_DEL_DIMENSION,
METADATA_STORE_CLAIM_ID,
METADATA_ADD_HOST_INFO,
Expand Down Expand Up @@ -668,6 +669,31 @@ void sqlite_uuid_random(sqlite3_context *context, int argc, sqlite3_value **argv
sqlite3_result_blob(context, &uuid, sizeof(nd_uuid_t), SQLITE_TRANSIENT);
}

static int64_t sql_get_wal_size(const char *database_file)
{
char filename[FILENAME_MAX + 1];
snprintfz(filename, sizeof(filename) - 1, "%s/%s-wal", netdata_configured_cache_dir, database_file);

uv_fs_t req;
int result = uv_fs_stat(NULL, &req, filename, NULL);
int64_t file_size = result >= 0 ? (int64_t) req.statbuf.st_size : -1;

uv_fs_req_cleanup(&req);
return file_size;
}

#define SQLITE_METADATA_WAL_LIMIT_X (10)

bool sql_metadata_wal_size_acceptable()
{
int64_t wal_size = sql_get_wal_size("netdata-meta.db");

if (wal_size > SQLITE_METADATA_WAL_LIMIT_X * def_journal_size_limit)
return false;

return true;
}

// Init
/*
* Initialize the SQLite database
Expand Down Expand Up @@ -898,6 +924,29 @@ static int store_claim_id(nd_uuid_t *host_id, nd_uuid_t *claim_id)
return rc != SQLITE_DONE;
}

#define SQL_DELETE_DIMENSION_BY_ID "DELETE FROM dimension WHERE rowid = @dimension_row AND dim_id = @uuid"

static void delete_dimension_by_rowid(sqlite3_stmt **res, int64_t dimension_id, nd_uuid_t *dim_uuid)
{
if (!*res) {
if (!PREPARE_STATEMENT(db_meta, SQL_DELETE_DIMENSION_BY_ID, res))
return;
}

int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(*res, ++param, dimension_id));
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(*res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC));

param = 0;
int rc = sqlite3_step_monitored(*res);
if (unlikely(rc != SQLITE_DONE))
error_report("Failed to delete dimension id, rc = %d", rc);

done:
REPORT_BIND_FAIL(*res, param);
SQLITE_RESET(*res);
}

static void delete_dimension_uuid(nd_uuid_t *dimension_uuid, sqlite3_stmt **action_res __maybe_unused, bool flag __maybe_unused)
{
static __thread sqlite3_stmt *res = NULL;
Expand Down Expand Up @@ -1542,11 +1591,127 @@ void vacuum_database(sqlite3 *database, const char *db_alias, int threshold, int
}
}


#define SQL_SELECT_HOST_CTX_CHART_DIM_LIST \
"SELECT d.dim_id, d.rowid FROM chart c, dimension d WHERE c.chart_id = d.chart_id AND c.rowid = @rowid"

static bool clean_host_chart_dimensions(sqlite3_stmt **res, int64_t chart_row_id, size_t *checked, size_t *deleted)
{
struct metadata_wc *wc = &metasync_worker;

if (!*res) {
if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_HOST_CTX_CHART_DIM_LIST, res))
return false;
}
int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_int64(*res, ++param, chart_row_id));
param = 0;

sqlite3_stmt *dim_del_stmt = NULL;

bool can_continue = true;

while (can_continue && sqlite3_step_monitored(*res) == SQLITE_ROW) {
if (sqlite3_column_bytes(*res, 0) != sizeof(nd_uuid_t))
continue;

nd_uuid_t *dim_uuid = (nd_uuid_t *)sqlite3_column_blob(*res, 0);
int64_t dimension_id = sqlite3_column_int64(*res, 1);

if (dimension_can_be_deleted(dim_uuid, NULL, false)) {
delete_dimension_by_rowid(&dim_del_stmt, dimension_id, dim_uuid);
(*deleted)++;
}
(*checked)++;
can_continue = (!metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) && sql_metadata_wal_size_acceptable();
}
SQLITE_FINALIZE(dim_del_stmt);

done:
REPORT_BIND_FAIL(*res, param);
SQLITE_RESET(*res);
return can_continue;
}

#define SQL_SELECT_HOST_CTX_CHART_LIST "SELECT rowid, context FROM chart WHERE host_id = @host"

static void cleanup_host_context_metadata(Pvoid_t CTX_JudyL, void *data)
{
if (!CTX_JudyL || !data)
return;

struct metadata_wc *wc = &metasync_worker;

RRDHOST *host = data;

sqlite3_stmt *res = NULL;
sqlite3_stmt *dimension_res = NULL;
sqlite3_stmt *context_res = NULL;

if (!PREPARE_STATEMENT(db_meta, SQL_SELECT_HOST_CTX_CHART_LIST, &res))
return;

Word_t num_of_contexts = JudyLCount(CTX_JudyL, 0, -1, PJE0);

nd_log_daemon(NDLP_DEBUG, "Verifying the retention of %zu contexts for host %s", num_of_contexts, rrdhost_hostname(host));

int param = 0;
SQLITE_BIND_FAIL(done, sqlite3_bind_blob(res, ++param, &host->host_id.uuid, sizeof(host->host_id.uuid), SQLITE_STATIC));

param = 0;
Pvoid_t *Pvalue;
int64_t chart_row_id;

size_t deleted = 0;
size_t checked = 0;

bool can_continue = true;
while (can_continue && sqlite3_step_monitored(res) == SQLITE_ROW) {
chart_row_id = sqlite3_column_int64(res, 0);
const char *context = (char *)sqlite3_column_text(res, 1);
STRING *ctx = string_strdupz(context);
Pvalue = JudyLGet(CTX_JudyL, (Word_t)ctx, PJE0);
if (Pvalue) {
can_continue = clean_host_chart_dimensions(&dimension_res, chart_row_id, &checked, &deleted);
ctx_delete_metadata_cleanup_context(&context_res, &host->host_id.uuid, context);
}
string_freez(ctx);
can_continue =
can_continue && (!metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)) && sql_metadata_wal_size_acceptable();
}
SQLITE_FINALIZE(dimension_res);
SQLITE_FINALIZE(context_res);

nd_log_daemon(
NDLP_DEBUG,
"Verified the contexts of host %s (Checked %zu metrics and removed %zu)",
rrdhost_hostname(host),
checked,
deleted);

done:
REPORT_BIND_FAIL(res, param);
SQLITE_FINALIZE(res);
}

void run_metadata_cleanup(struct metadata_wc *wc)
{
if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
return;

if (sql_metadata_wal_size_acceptable()) {
RRDHOST *host;
dfe_start_reentrant(rrdhost_root_index, host) {
ctx_get_context_list_to_cleanup(&host->host_id.uuid, cleanup_host_context_metadata, host);
if (metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN) || false == sql_metadata_wal_size_acceptable())
break;
}
dfe_done(host);
}

if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
return;

check_dimension_metadata(wc);
check_chart_metadata(wc);
check_label_metadata(wc);
Expand Down Expand Up @@ -1956,7 +2121,7 @@ static void do_chart_label_cleanup(struct judy_list_t *cl_cleanup_data)
}

// Worker thread to scan hosts for pending metadata to store
static void start_metadata_hosts(uv_work_t *req __maybe_unused)
static void start_metadata_hosts(uv_work_t *req)
{
register_libuv_worker_jobs();

Expand Down Expand Up @@ -2072,7 +2237,6 @@ static void metadata_event_loop(void *arg)
{
worker_register("METASYNC");
worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
Expand Down Expand Up @@ -2130,7 +2294,6 @@ static void metadata_event_loop(void *arg)
nd_uuid_t *uuid;
RRDHOST *host = NULL;
ALARM_ENTRY *ae = NULL;
// struct aclk_sync_cfg_t *host_aclk_sync;

worker_is_idle();
uv_run(loop, UV_RUN_DEFAULT);
Expand All @@ -2156,7 +2319,6 @@ static void metadata_event_loop(void *arg)

switch (opcode) {
case METADATA_DATABASE_NOOP:
case METADATA_DATABASE_TIMER:
break;
case METADATA_DEL_DIMENSION:
uuid = (nd_uuid_t *) cmd.param[0];
Expand Down

0 comments on commit 7553d19

Please sign in to comment.