From 2f1ebee0d09f03cdf5c2221f5c57aa89b79c78cd Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 31 May 2026 10:20:38 -0600 Subject: [PATCH 1/5] config_format: accept nested maps in service and input sections Signed-off-by: Eduardo Silva --- src/config_format/flb_cf_yaml.c | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/config_format/flb_cf_yaml.c b/src/config_format/flb_cf_yaml.c index 675ee6dd9e3..6b514ee93cd 100644 --- a/src/config_format/flb_cf_yaml.c +++ b/src/config_format/flb_cf_yaml.c @@ -1966,6 +1966,19 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, return YAML_FAILURE; } break; + case YAML_MAPPING_START_EVENT: + if (state->section == SECTION_ENV) { + flb_error("nested maps are not allowed in env section"); + yaml_error_event(ctx, state, event); + return YAML_FAILURE; + } + + state = state_push_variant(ctx, state, 1); + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; default: yaml_error_event(ctx, state, event); return YAML_FAILURE; @@ -2238,6 +2251,21 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; } + if (state->section == SECTION_INPUT && + strcmp(state->key, "telemetry") == 0) { + /* + * Input telemetry is consumed structurally at load time. Other + * nested input maps keep the legacy group behavior. + */ + state = state_push_variant(ctx, state, 1); + + if (state == NULL) { + flb_error("unable to allocate state"); + return YAML_FAILURE; + } + break; + } + state = state_push(ctx, STATE_GROUP_KEY); if (state == NULL) { @@ -2404,6 +2432,21 @@ static int consume_event(struct flb_cf *conf, struct local_ctx *ctx, break; } + if (state->state == STATE_SECTION_VAL) { + if (flb_cf_section_property_add_variant(conf, + state->cf_section->properties, + state->key, + flb_sds_len(state->key), + variant) == NULL) { + flb_error("unable to insert section variant"); + return YAML_FAILURE; + } + + state = state_pop(ctx); + + break; + } + if (state->variant->type == CFL_VARIANT_KVLIST && state->variant_kvlist_key == NULL) { flb_error("invalid state, should have a variant key"); cfl_variant_destroy(variant); From 1e5ded36a5be3c5bab7034f1b0944b1e75fc2140 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 31 May 2026 10:20:38 -0600 Subject: [PATCH 2/5] config: add logs tag records telemetry metrics Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_config.h | 12 + src/flb_config.c | 404 +++++++++++++++++++++++++++++++- 2 files changed, 415 insertions(+), 1 deletion(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index 3ebcb5edbae..49ea0c327cc 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -32,6 +32,7 @@ #include struct flb_router; +struct flb_hash_table; #define FLB_CONFIG_FLUSH_SECS 1 #define FLB_CONFIG_HTTP_LISTEN "0.0.0.0" @@ -199,6 +200,17 @@ struct flb_config { */ struct mk_list cmetrics; + /* + * Optional telemetry metrics with user-controlled cardinality. + */ + int telemetry_metrics_logs_tag_records; + int telemetry_metrics_logs_tag_records_max_series; + int telemetry_metrics_logs_tag_records_max_tag_length; + size_t telemetry_metrics_logs_tag_records_series_count; + struct flb_hash_table *telemetry_metrics_logs_tag_records_ht; + pthread_mutex_t telemetry_metrics_logs_tag_records_lock; + int telemetry_metrics_logs_tag_records_lock_inited; + /* HTTP Server */ #ifdef FLB_HAVE_HTTP_SERVER int http_server; /* HTTP Server running */ diff --git a/src/flb_config.c b/src/flb_config.c index f6993114fcd..aaab83fbc6a 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include @@ -45,6 +47,7 @@ #include #include #include +#include const char *FLB_CONF_ENV_LOGLEVEL = "FLB_LOG_LEVEL"; @@ -175,6 +178,12 @@ struct flb_service_config service_configs[] = { FLB_CONF_TYPE_STR, offsetof(struct flb_config, storage_rejected_limit)}, + /* + * Telemetry tag-records metrics are configured exclusively through the + * nested 'telemetry' YAML block (see config_apply_telemetry_block); there + * is intentionally no flat dotted-key service property for them. + */ + /* Coroutines */ {FLB_CONF_STR_CORO_STACK_SIZE, FLB_CONF_TYPE_INT, @@ -363,6 +372,32 @@ struct flb_config *flb_config_init() config->storage_bl_flush_on_shutdown = FLB_FALSE; config->storage_rejected_path = NULL; config->storage_rejected_limit = NULL; + config->telemetry_metrics_logs_tag_records = FLB_FALSE; + config->telemetry_metrics_logs_tag_records_max_series = 500; + config->telemetry_metrics_logs_tag_records_max_tag_length = 128; + config->telemetry_metrics_logs_tag_records_series_count = 0; + config->telemetry_metrics_logs_tag_records_ht = + flb_hash_table_create(FLB_HASH_TABLE_EVICT_NONE, 512, 0); + if (!config->telemetry_metrics_logs_tag_records_ht) { + flb_router_destroy(config->router); + if (config->kernel) { + flb_kernel_destroy(config->kernel); + } +#ifdef FLB_HAVE_HTTP_SERVER + if (config->http_listen) { + flb_free(config->http_listen); + } + + if (config->http_port) { + flb_free(config->http_port); + } +#endif + flb_cf_destroy(cf); + flb_free(config); + return NULL; + } + pthread_mutex_init(&config->telemetry_metrics_logs_tag_records_lock, NULL); + config->telemetry_metrics_logs_tag_records_lock_inited = FLB_TRUE; config->sched_cap = FLB_SCHED_CAP; config->sched_base = FLB_SCHED_BASE; config->json_escape_unicode = FLB_TRUE; @@ -551,6 +586,13 @@ void flb_config_exit(struct flb_config *config) flb_env_destroy(config->env); } + if (config->telemetry_metrics_logs_tag_records_ht) { + flb_hash_table_destroy(config->telemetry_metrics_logs_tag_records_ht); + } + if (config->telemetry_metrics_logs_tag_records_lock_inited == FLB_TRUE) { + pthread_mutex_destroy(&config->telemetry_metrics_logs_tag_records_lock); + } + /* Program name */ if (config->program_name) { flb_sds_destroy(config->program_name); @@ -838,6 +880,321 @@ int flb_config_set_program_name(struct flb_config *config, char *name) return 0; } +/* Parse a scalar variant as a boolean (accepts bool, int, or string). */ +static int variant_as_bool(struct flb_config *config, + struct cfl_variant *v, int *out) +{ + int ret; + flb_sds_t tmp; + + switch (v->type) { + case CFL_VARIANT_BOOL: + *out = v->data.as_bool ? FLB_TRUE : FLB_FALSE; + return 0; + case CFL_VARIANT_INT: + /* only 0/1 are valid numeric booleans (reject e.g. 2 or -1) */ + if (v->data.as_int64 != 0 && v->data.as_int64 != 1) { + return -1; + } + *out = v->data.as_int64 ? FLB_TRUE : FLB_FALSE; + return 0; + case CFL_VARIANT_UINT: + if (v->data.as_uint64 != 0 && v->data.as_uint64 != 1) { + return -1; + } + *out = v->data.as_uint64 ? FLB_TRUE : FLB_FALSE; + return 0; + case CFL_VARIANT_STRING: + tmp = flb_env_var_translate(config->env, v->data.as_string); + if (!tmp) { + return -1; + } + ret = flb_utils_bool(tmp); + flb_sds_destroy(tmp); + if (ret == -1) { + return -1; + } + *out = ret; + return 0; + default: + return -1; + } +} + +/* Parse a scalar variant as an int (accepts int, uint, or string). */ +static int variant_as_int(struct flb_config *config, + struct cfl_variant *v, int *out) +{ + flb_sds_t tmp; + char *end; + long val; + + switch (v->type) { + case CFL_VARIANT_INT: + if (v->data.as_int64 < INT_MIN || v->data.as_int64 > INT_MAX) { + return -1; + } + *out = (int) v->data.as_int64; + return 0; + case CFL_VARIANT_UINT: + if (v->data.as_uint64 > (uint64_t) INT_MAX) { + return -1; + } + *out = (int) v->data.as_uint64; + return 0; + case CFL_VARIANT_STRING: + tmp = flb_env_var_translate(config->env, v->data.as_string); + if (!tmp) { + return -1; + } + errno = 0; + end = NULL; + val = strtol(tmp, &end, 10); + /* + * Require a complete, in-range integer string: reject empty input, + * trailing characters (e.g. "10abc"), and overflow. A typo must not + * silently become 0 and disable a cardinality guard. + */ + if (end == NULL || end == tmp || *end != '\0' || + errno == ERANGE || val < INT_MIN || val > INT_MAX) { + flb_sds_destroy(tmp); + return -1; + } + flb_sds_destroy(tmp); + *out = (int) val; + return 0; + default: + return -1; + } +} + +/* + * Reject any key in 'kvlist' that is not in the NULL-terminated 'allowed' + * list. This catches typos in the parent maps (e.g. 'metricz' or 'tag_record') + * that would otherwise look configured to the user but silently do nothing. + */ +static int reject_unknown_keys(struct cfl_kvlist *kvlist, const char *path, + const char **allowed) +{ + int i; + int ok; + struct cfl_list *head; + struct cfl_kvpair *kv; + + cfl_list_foreach(head, &kvlist->list) { + kv = cfl_list_entry(head, struct cfl_kvpair, _head); + + ok = FLB_FALSE; + for (i = 0; allowed[i] != NULL; i++) { + if (strcasecmp(kv->key, allowed[i]) == 0) { + ok = FLB_TRUE; + break; + } + } + + if (!ok) { + flb_error("[config] unknown '%s' option '%s'", path, kv->key); + return -1; + } + } + + return 0; +} + +/* + * Navigate telemetry > metrics > logs > tag_records. + * + * returns 1 -> '*out' is set to the tag_records node + * 0 -> the block stops short of tag_records (nothing to configure) + * -1 -> a known intermediate node has the wrong shape, or any map + * along the path contains an unknown key (malformed) + * + * This is intentionally a nested-block-only API: there is no flat dotted-key + * variant (e.g. "telemetry.metrics.logs.tag_records: true" is not honored). + * 'telemetry' must already be validated as a map by the caller. + */ +static int telemetry_logs_tag_records(struct cfl_variant *telemetry, + struct cfl_variant **out) +{ + static const char *telemetry_keys[] = {"metrics", NULL}; + static const char *metrics_keys[] = {"logs", NULL}; + static const char *logs_keys[] = {"tag_records", NULL}; + struct cfl_variant *metrics; + struct cfl_variant *logs; + struct cfl_variant *tag_records; + + *out = NULL; + + if (reject_unknown_keys(telemetry->data.as_kvlist, + "telemetry", telemetry_keys) != 0) { + return -1; + } + + metrics = cfl_kvlist_fetch(telemetry->data.as_kvlist, "metrics"); + if (!metrics) { + return 0; + } + if (metrics->type != CFL_VARIANT_KVLIST) { + flb_error("[config] 'telemetry.metrics' must be a map"); + return -1; + } + + if (reject_unknown_keys(metrics->data.as_kvlist, + "telemetry.metrics", metrics_keys) != 0) { + return -1; + } + + logs = cfl_kvlist_fetch(metrics->data.as_kvlist, "logs"); + if (!logs) { + return 0; + } + if (logs->type != CFL_VARIANT_KVLIST) { + flb_error("[config] 'telemetry.metrics.logs' must be a map"); + return -1; + } + + if (reject_unknown_keys(logs->data.as_kvlist, + "telemetry.metrics.logs", logs_keys) != 0) { + return -1; + } + + tag_records = cfl_kvlist_fetch(logs->data.as_kvlist, "tag_records"); + if (!tag_records) { + return 0; + } + + *out = tag_records; + return 1; +} + +/* Apply a service-level 'telemetry' block (master switch + global limits). */ +static int config_apply_telemetry_block(struct flb_config *config, + struct cfl_variant *telemetry) +{ + int ret; + int b; + int n; + struct cfl_variant *tag_records; + struct cfl_list *head; + struct cfl_kvpair *kv; + + if (telemetry->type != CFL_VARIANT_KVLIST) { + flb_error("[config] 'telemetry' must be a map"); + return -1; + } + + ret = telemetry_logs_tag_records(telemetry, &tag_records); + if (ret <= 0) { + /* 0 = nothing to configure, -1 = malformed (already logged) */ + return ret; + } + + /* tag_records is always a map; the 'enabled' key is the on/off toggle */ + if (tag_records->type != CFL_VARIANT_KVLIST) { + flb_error("[config] 'telemetry.metrics.logs.tag_records' must be a map " + "with an 'enabled' key"); + return -1; + } + + /* apply known keys, reject unknown ones (catch typos) */ + cfl_list_foreach(head, &tag_records->data.as_kvlist->list) { + kv = cfl_list_entry(head, struct cfl_kvpair, _head); + + if (strcasecmp(kv->key, "enabled") == 0) { + if (variant_as_bool(config, kv->val, &b) != 0) { + flb_error("[config] invalid " + "'telemetry.metrics.logs.tag_records.enabled'"); + return -1; + } + config->telemetry_metrics_logs_tag_records = b; + } + else if (strcasecmp(kv->key, "max_series") == 0) { + if (variant_as_int(config, kv->val, &n) != 0) { + flb_error("[config] invalid " + "'telemetry.metrics.logs.tag_records.max_series'"); + return -1; + } + config->telemetry_metrics_logs_tag_records_max_series = n; + } + else if (strcasecmp(kv->key, "max_tag_length") == 0) { + if (variant_as_int(config, kv->val, &n) != 0) { + flb_error("[config] invalid " + "'telemetry.metrics.logs.tag_records.max_tag_length'"); + return -1; + } + config->telemetry_metrics_logs_tag_records_max_tag_length = n; + } + else { + flb_error("[config] unknown " + "'telemetry.metrics.logs.tag_records' option '%s'", + kv->key); + return -1; + } + } + + return 0; +} + +/* + * Apply an input-level 'telemetry' block. Inputs only support the 'enabled' + * toggle of telemetry.metrics.logs.tag_records (the global limits are + * service-only). tag_records is always a map, like the service section. + */ +static int input_apply_telemetry_block(struct flb_input_instance *ins, + struct cfl_variant *telemetry) +{ + int ret; + int b; + struct cfl_variant *tag_records; + struct cfl_variant *enabled; + struct cfl_list *head; + struct cfl_kvpair *kv; + + if (telemetry->type != CFL_VARIANT_KVLIST) { + flb_error("[config] 'telemetry' must be a map"); + return -1; + } + + ret = telemetry_logs_tag_records(telemetry, &tag_records); + if (ret <= 0) { + return ret; + } + + if (tag_records->type != CFL_VARIANT_KVLIST) { + flb_error("[config] 'telemetry.metrics.logs.tag_records' must be a map " + "with an 'enabled' key on input '%s'", flb_input_name(ins)); + return -1; + } + + /* only 'enabled' is valid per input (limits are service-only) */ + enabled = NULL; + cfl_list_foreach(head, &tag_records->data.as_kvlist->list) { + kv = cfl_list_entry(head, struct cfl_kvpair, _head); + if (strcasecmp(kv->key, "enabled") == 0) { + enabled = kv->val; + } + else { + flb_error("[config] 'telemetry.metrics.logs.tag_records.%s' is " + "service-level only, not supported on input '%s'", + kv->key, flb_input_name(ins)); + return -1; + } + } + + if (!enabled) { + return 0; + } + + if (variant_as_bool(ins->config, enabled, &b) != 0) { + flb_error("[config] invalid 'telemetry.metrics.logs.tag_records.enabled' " + "on input '%s'", flb_input_name(ins)); + return -1; + } + + ins->telemetry_metrics_logs_tag_records = b; + return 0; +} + static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, enum section_type type) { int ret; @@ -945,6 +1302,11 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, val = kv->val->data.as_array->entries[i]; ret = flb_input_set_property(ins, kv->key, val->data.as_string); } + } else if (kv->val->type == CFL_VARIANT_KVLIST) { + /* nested map blocks: only 'telemetry' is recognized */ + if (strcasecmp(kv->key, "telemetry") == 0) { + ret = input_apply_telemetry_block(ins, kv->val); + } } } else if (type == FLB_CF_FILTER) { @@ -1021,6 +1383,42 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, } return -1; } + +static int set_service_property(struct flb_config *config, + struct cfl_kvpair *kv) +{ + if (kv->val->type == CFL_VARIANT_STRING) { + /* + * The 'telemetry' settings are configured exclusively as a nested + * block. Reject a bare scalar ("telemetry: x") or a flat dotted key + * ("telemetry.metrics.logs.tag_records: true") explicitly, instead of + * letting it fall through and be silently ignored as an unknown key. + */ + if (strcasecmp(kv->key, "telemetry") == 0 || + strncasecmp(kv->key, "telemetry.", 10) == 0) { + flb_error("[config] '%s' must be configured as a nested 'telemetry' " + "block", kv->key); + return -1; + } + return flb_config_set_property(config, kv->key, + kv->val->data.as_string); + } + + if (kv->val->type == CFL_VARIANT_KVLIST) { + /* nested map blocks: only 'telemetry' is recognized */ + if (strcasecmp(kv->key, "telemetry") == 0) { + return config_apply_telemetry_block(config, kv->val); + } + + flb_error("[config] unsupported nested service property '%s'", kv->key); + return -1; + } + + flb_error("[config] unsupported value type for service property '%s'", + kv->key); + return -1; +} + /* Load a struct flb_config_format context into a flb_config instance */ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) { @@ -1094,7 +1492,11 @@ int flb_config_load_config_format(struct flb_config *config, struct flb_cf *cf) /* Iterate properties */ cfl_list_foreach(chead, &s->properties->list) { ckv = cfl_list_entry(chead, struct cfl_kvpair, _head); - flb_config_set_property(config, ckv->key, ckv->val->data.as_string); + if (set_service_property(config, ckv) == -1) { + flb_error("[config] could not configure service property '%s'", + ckv->key); + return -1; + } } } From f522ce0f756eca946653b22e3d9aa36f79976167 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 31 May 2026 10:20:38 -0600 Subject: [PATCH 3/5] input: add per-input logs tag records metrics override Signed-off-by: Eduardo Silva --- include/fluent-bit/flb_input.h | 3 +++ src/flb_input.c | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/include/fluent-bit/flb_input.h b/include/fluent-bit/flb_input.h index c2ca41bd75e..78feb945f10 100644 --- a/include/fluent-bit/flb_input.h +++ b/include/fluent-bit/flb_input.h @@ -263,6 +263,7 @@ struct flb_input_instance { char *tag; /* Input tag for routing */ int tag_len; int tag_default; /* is it using the default tag? */ + int telemetry_metrics_logs_tag_records; /* override service tag records */ /* By default all input instances are 'routable' */ int routable; @@ -403,6 +404,8 @@ struct flb_input_instance { struct cmt *cmt; /* parent context */ struct cmt_counter *cmt_bytes; /* metric: input_bytes_total */ struct cmt_counter *cmt_records; /* metric: input_records_total */ + struct cmt_counter *cmt_logs_tag_records; + struct cmt_counter *cmt_logs_tag_records_untracked; /* is the input instance overlimit ?: 1 or 0 */ struct cmt_gauge *cmt_storage_overlimit; diff --git a/src/flb_input.c b/src/flb_input.c index e990eee238c..8dee1b75cda 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -405,6 +405,7 @@ struct flb_input_instance *flb_input_new(struct flb_config *config, instance->tag = NULL; instance->tag_len = 0; instance->tag_default = FLB_FALSE; + instance->telemetry_metrics_logs_tag_records = -1; instance->routable = FLB_TRUE; instance->data = data; instance->storage = NULL; @@ -1396,10 +1397,17 @@ int flb_input_instance_init(struct flb_input_instance *ins, #ifdef FLB_HAVE_METRICS uint64_t ts; char *name; + int logs_tag_records_enabled; name = (char *) flb_input_name(ins); ts = cfl_time_now(); + /* resolve effective tag-records tracking: input override wins over service */ + logs_tag_records_enabled = + (ins->telemetry_metrics_logs_tag_records != -1) + ? ins->telemetry_metrics_logs_tag_records + : ctx->telemetry_metrics_logs_tag_records; + /* CMetrics */ ins->cmt = cmt_create(); if (!ins->cmt) { @@ -1429,6 +1437,28 @@ int flb_input_instance_init(struct flb_input_instance *ins, 1, (char *[]) {"name"}); cmt_counter_set(ins->cmt_records, ts, 0, 1, (char *[]) {name}); + if (logs_tag_records_enabled == FLB_TRUE) { + /* fluentbit_input_logs_tag_records_total */ + ins->cmt_logs_tag_records = \ + cmt_counter_create(ins->cmt, + "fluentbit", "input", "logs_tag_records_total", + "Number of input log records by tag.", + 2, (char *[]) {"name", "tag"}); + if (!ins->cmt_logs_tag_records) { + return -1; + } + + /* fluentbit_input_logs_tag_records_untracked_total */ + ins->cmt_logs_tag_records_untracked = \ + cmt_counter_create(ins->cmt, + "fluentbit", "input", "logs_tag_records_untracked_total", + "Number of input log records not tracked by tag.", + 2, (char *[]) {"name", "reason"}); + if (!ins->cmt_logs_tag_records_untracked) { + return -1; + } + } + /* fluentbit_input_ingestion_paused */ ins->cmt_ingestion_paused = \ cmt_gauge_create(ins->cmt, From ce28316cdf221f24327fa964938aab172ad7503a Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 31 May 2026 10:20:38 -0600 Subject: [PATCH 4/5] input_chunk: count input log records per tag Signed-off-by: Eduardo Silva --- src/flb_input_chunk.c | 161 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/src/flb_input_chunk.c b/src/flb_input_chunk.c index 0a5f83ea7dc..ced35fa23dd 100644 --- a/src/flb_input_chunk.c +++ b/src/flb_input_chunk.c @@ -42,6 +42,7 @@ #include #include #include +#include #ifdef FLB_HAVE_CHUNK_TRACE @@ -56,6 +57,162 @@ #define FLB_INPUT_CHUNK_RAW_LOG_ROUTING (1 << 0) +static int logs_tag_records_metrics_enabled(struct flb_input_instance *in) +{ + if (in->telemetry_metrics_logs_tag_records != -1) { + return in->telemetry_metrics_logs_tag_records; + } + + return in->config->telemetry_metrics_logs_tag_records; +} + +static void update_logs_tag_records_untracked(struct flb_input_instance *in, + uint64_t ts, + size_t records, + const char *reason) +{ + if (!in->cmt_logs_tag_records_untracked || records == 0) { + return; + } + + cmt_counter_add(in->cmt_logs_tag_records_untracked, ts, records, + 2, (char *[]) {(char *) flb_input_name(in), + (char *) reason}); +} + +static void update_logs_tag_records_metrics(struct flb_input_instance *in, + uint64_t ts, + size_t records, + const char *tag, + size_t tag_len) +{ + int ret; + int prefix_len; + char value = 1; + char prefix[32]; + char stack_key[256]; + char *key; + size_t input_name_len; + size_t key_len; + size_t out_size; + void *out_buf; + const char *input_name; + flb_sds_t tag_sds; + struct flb_config *config; + + if (records == 0 || logs_tag_records_metrics_enabled(in) != FLB_TRUE) { + return; + } + + if (!in->cmt_logs_tag_records || !in->cmt_logs_tag_records_untracked) { + return; + } + + config = in->config; + + /* Enforce the tag length limit before allocating anything */ + if (config->telemetry_metrics_logs_tag_records_max_tag_length > 0 && + tag_len > (size_t) config->telemetry_metrics_logs_tag_records_max_tag_length) { + update_logs_tag_records_untracked(in, ts, records, "tag_length_limit"); + return; + } + + input_name = flb_input_name(in); + input_name_len = strlen(input_name); + + /* + * Build a NUL-free, unambiguous cardinality key using a numeric length + * prefix: ":". The hash table stores and compares + * keys with NUL-terminated semantics (flb_strndup/strncmp), so the key + * must not contain an embedded NUL separator. The length prefix keeps the + * (name, tag) -> key mapping injective without relying on a delimiter byte + * that could legitimately appear in a name or tag. + */ + prefix_len = snprintf(prefix, sizeof(prefix), "%zu:", input_name_len); + if (prefix_len < 0 || (size_t) prefix_len >= sizeof(prefix)) { + update_logs_tag_records_untracked(in, ts, records, "error"); + return; + } + + key_len = (size_t) prefix_len + input_name_len + tag_len; + + /* + * The hash table API takes the key length as an int. With max_tag_length + * disabled (<= 0) an extremely large tag could otherwise overflow the + * cast, so guard the key length explicitly. + */ + if (key_len > INT_MAX) { + update_logs_tag_records_untracked(in, ts, records, "error"); + return; + } + + if (key_len <= sizeof(stack_key)) { + key = stack_key; + } + else { + key = flb_malloc(key_len); + if (!key) { + flb_errno(); + update_logs_tag_records_untracked(in, ts, records, "error"); + return; + } + } + + memcpy(key, prefix, prefix_len); + memcpy(key + prefix_len, input_name, input_name_len); + memcpy(key + prefix_len + input_name_len, tag, tag_len); + + pthread_mutex_lock(&config->telemetry_metrics_logs_tag_records_lock); + + ret = flb_hash_table_get(config->telemetry_metrics_logs_tag_records_ht, + key, (int) key_len, &out_buf, &out_size); + if (ret == -1) { + if (config->telemetry_metrics_logs_tag_records_max_series > 0 && + config->telemetry_metrics_logs_tag_records_series_count >= + (size_t) config->telemetry_metrics_logs_tag_records_max_series) { + pthread_mutex_unlock(&config->telemetry_metrics_logs_tag_records_lock); + if (key != stack_key) { + flb_free(key); + } + update_logs_tag_records_untracked(in, ts, records, "max_series"); + return; + } + + ret = flb_hash_table_add(config->telemetry_metrics_logs_tag_records_ht, + key, (int) key_len, &value, sizeof(value)); + if (ret == -1) { + pthread_mutex_unlock(&config->telemetry_metrics_logs_tag_records_lock); + if (key != stack_key) { + flb_free(key); + } + update_logs_tag_records_untracked(in, ts, records, "error"); + return; + } + + config->telemetry_metrics_logs_tag_records_series_count++; + } + + pthread_mutex_unlock(&config->telemetry_metrics_logs_tag_records_lock); + + if (key != stack_key) { + flb_free(key); + } + + /* cmetrics requires a NUL-terminated label value for the tag */ + tag_sds = flb_sds_create_len(tag, tag_len); + if (!tag_sds) { + flb_errno(); + update_logs_tag_records_untracked(in, ts, records, "error"); + return; + } + + cmt_counter_add(in->cmt_logs_tag_records, ts, records, + 2, (char *[]) {(char *) input_name, + (char *) tag_sds}); + + flb_sds_destroy(tag_sds); +} + struct input_chunk_raw { struct flb_input_instance *ins; int event_type; @@ -2801,6 +2958,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in, cmt_counter_add(in->cmt_records, ts, ic->added_records, 1, (char *[]) {(char *) flb_input_name(in)}); + if (event_type == FLB_INPUT_LOGS) { + update_logs_tag_records_metrics(in, ts, ic->added_records, tag, tag_len); + } + /* fluentbit_input_bytes_total */ cmt_counter_add(in->cmt_bytes, ts, buf_size, 1, (char *[]) {(char *) flb_input_name(in)}); From fedc0993e4c5c1a76a402d3a2c310c9a8d60981d Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Sun, 31 May 2026 10:20:38 -0600 Subject: [PATCH 5/5] tests: integration: add logs tag records metrics scenarios Signed-off-by: Eduardo Silva --- .../config/bad_tag_records_bool_form.yaml | 21 ++ .../config/bad_tag_records_dotted_key.yaml | 16 + .../bad_tag_records_dotted_service.yaml | 18 + .../config/bad_tag_records_enabled_value.yaml | 20 ++ .../bad_tag_records_input_service_only.yaml | 23 ++ .../config/bad_tag_records_input_value.yaml | 19 ++ .../bad_tag_records_malformed_metrics.yaml | 18 + .../bad_tag_records_max_series_value.yaml | 22 ++ .../bad_tag_records_nested_unknown.yaml | 18 + .../config/bad_tag_records_numeric_bool.yaml | 21 ++ .../bad_tag_records_telemetry_scalar.yaml | 17 + .../config/bad_tag_records_type.yaml | 22 ++ .../bad_tag_records_unknown_option.yaml | 23 ++ .../bad_tag_records_unknown_parent_key.yaml | 21 ++ .../lenient_tag_records_empty_block.yaml | 22 ++ .../config/tag_records_accumulate.yaml | 24 ++ .../config/tag_records_disabled.yaml | 18 + .../config/tag_records_enabled.yaml | 23 ++ .../config/tag_records_input_enable.yaml | 28 ++ .../tag_records_input_nested_disable.yaml | 33 ++ .../tag_records_input_nested_enable.yaml | 28 ++ .../config/tag_records_input_override.yaml | 28 ++ .../config/tag_records_max_series.yaml | 29 ++ .../config/tag_records_max_tag_length.yaml | 24 ++ .../config/tag_records_multi_tag.yaml | 28 ++ .../test_internal_metrics_tag_records.py | 307 ++++++++++++++++++ ..._internal_metrics_tag_records_badconfig.py | 149 +++++++++ 27 files changed, 1020 insertions(+) create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_bool_form.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_key.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_service.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_enabled_value.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_service_only.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_value.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_malformed_metrics.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_max_series_value.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_nested_unknown.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_numeric_bool.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_telemetry_scalar.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_type.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_option.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_parent_key.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/lenient_tag_records_empty_block.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_accumulate.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_disabled.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_enabled.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_input_enable.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_disable.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_enable.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_input_override.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_max_series.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_max_tag_length.yaml create mode 100644 tests/integration/scenarios/internal_metrics/config/tag_records_multi_tag.yaml create mode 100644 tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records.py create mode 100644 tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records_badconfig.py diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_bool_form.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_bool_form.yaml new file mode 100644 index 00000000000..48a1fe34cc5 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_bool_form.yaml @@ -0,0 +1,21 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + # tag_records must be a map with an 'enabled' key; the bare boolean + # short form is intentionally not accepted. + tag_records: true + +pipeline: + inputs: + - name: dummy + tag: bad.boolform + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_key.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_key.yaml new file mode 100644 index 00000000000..3c38d38f36e --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_key.yaml @@ -0,0 +1,16 @@ +service: + flush: 1 + grace: 1 + log_level: error + +pipeline: + inputs: + - name: dummy + tag: bad.dotted + dummy: '{"message":"x"}' + samples: 1 + telemetry.metrics.logs.tag_records: true + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_service.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_service.yaml new file mode 100644 index 00000000000..bdbee21fd6a --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_dotted_service.yaml @@ -0,0 +1,18 @@ +service: + flush: 1 + grace: 1 + log_level: error + # Flat dotted keys are not supported; only nested 'telemetry' blocks are. + # This must be rejected explicitly, not silently ignored. + telemetry.metrics.logs.tag_records: true + +pipeline: + inputs: + - name: dummy + tag: bad.dotted.service + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_enabled_value.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_enabled_value.yaml new file mode 100644 index 00000000000..c3f717d3c89 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_enabled_value.yaml @@ -0,0 +1,20 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + tag_records: + enabled: notabool + +pipeline: + inputs: + - name: dummy + tag: bad.enabled + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_service_only.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_service_only.yaml new file mode 100644 index 00000000000..67b98781628 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_service_only.yaml @@ -0,0 +1,23 @@ +service: + flush: 1 + grace: 1 + log_level: error + +pipeline: + inputs: + - name: dummy + tag: bad.input.serviceonly + dummy: '{"message":"x"}' + samples: 1 + # max_series is service-level only; setting it on an input must be + # rejected rather than silently accepted. + telemetry: + metrics: + logs: + tag_records: + enabled: true + max_series: 5 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_value.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_value.yaml new file mode 100644 index 00000000000..a16768eb357 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_input_value.yaml @@ -0,0 +1,19 @@ +service: + flush: 1 + grace: 1 + log_level: error + +pipeline: + inputs: + - name: dummy + tag: bad.input + dummy: '{"message":"x"}' + samples: 1 + telemetry: + metrics: + logs: + tag_records: maybe + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_malformed_metrics.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_malformed_metrics.yaml new file mode 100644 index 00000000000..fe39470929d --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_malformed_metrics.yaml @@ -0,0 +1,18 @@ +service: + flush: 1 + grace: 1 + log_level: error + # 'metrics' must be a map; a scalar here is a malformed telemetry block + telemetry: + metrics: true + +pipeline: + inputs: + - name: dummy + tag: bad.metrics + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_max_series_value.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_max_series_value.yaml new file mode 100644 index 00000000000..3b6cc9eddf8 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_max_series_value.yaml @@ -0,0 +1,22 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + tag_records: + enabled: true + max_series: notanumber + +pipeline: + inputs: + - name: dummy + tag: bad.maxseries + dummy: '{"message":"x"}' + interval_sec: 0 + interval_nsec: 200000000 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_nested_unknown.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_nested_unknown.yaml new file mode 100644 index 00000000000..28ac4742743 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_nested_unknown.yaml @@ -0,0 +1,18 @@ +service: + flush: 1 + grace: 1 + log_level: error + bogus: + nested: + key: value + +pipeline: + inputs: + - name: dummy + tag: bad.nested + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_numeric_bool.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_numeric_bool.yaml new file mode 100644 index 00000000000..cbd86f3de62 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_numeric_bool.yaml @@ -0,0 +1,21 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + tag_records: + # numeric booleans are restricted to 0/1; 2 must be rejected + enabled: 2 + +pipeline: + inputs: + - name: dummy + tag: bad.numbool + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_telemetry_scalar.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_telemetry_scalar.yaml new file mode 100644 index 00000000000..dfa6be78d41 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_telemetry_scalar.yaml @@ -0,0 +1,17 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: oops + +pipeline: + inputs: + - name: dummy + tag: bad.scalar + dummy: '{"message":"x"}' + interval_sec: 0 + interval_nsec: 200000000 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_type.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_type.yaml new file mode 100644 index 00000000000..2b4a2501f4f --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_type.yaml @@ -0,0 +1,22 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + tag_records: + - 1 + - 2 + - 3 + +pipeline: + inputs: + - name: dummy + tag: bad.type + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_option.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_option.yaml new file mode 100644 index 00000000000..f8c5c841bd8 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_option.yaml @@ -0,0 +1,23 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + tag_records: + enabled: true + # typo: 'max_serie' is not a known option and must be rejected so a + # misconfigured cardinality guard is not silently left at default + max_serie: 5 + +pipeline: + inputs: + - name: dummy + tag: bad.unknown + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_parent_key.yaml b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_parent_key.yaml new file mode 100644 index 00000000000..166cfd7e82a --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/bad_tag_records_unknown_parent_key.yaml @@ -0,0 +1,21 @@ +service: + flush: 1 + grace: 1 + log_level: error + telemetry: + metrics: + logs: + # typo in the leaf key: 'tag_record' (missing 's') must be rejected so + # it is not silently treated as "nothing configured" + tag_record: true + +pipeline: + inputs: + - name: dummy + tag: bad.parentkey + dummy: '{"message":"x"}' + samples: 1 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/lenient_tag_records_empty_block.yaml b/tests/integration/scenarios/internal_metrics/config/lenient_tag_records_empty_block.yaml new file mode 100644 index 00000000000..4c191a444c1 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/lenient_tag_records_empty_block.yaml @@ -0,0 +1,22 @@ +service: + flush: 1 + grace: 1 + log_level: error + # An empty tag_records block is valid: nothing is configured, the feature + # stays disabled, and Fluent Bit must start normally (no error, no crash). + telemetry: + metrics: + logs: + tag_records: {} + +pipeline: + inputs: + - name: dummy + tag: lenient.empty + dummy: '{"message":"x"}' + interval_sec: 0 + interval_nsec: 200000000 + + outputs: + - name: 'null' + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_accumulate.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_accumulate.yaml new file mode 100644 index 00000000000..ba217a47df2 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_accumulate.yaml @@ -0,0 +1,24 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + +pipeline: + inputs: + - name: dummy + alias: tag_records_accumulate + tag: tag.records.acc + dummy: '{"message":"acc"}' + interval_sec: 0 + interval_nsec: 100000000 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_disabled.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_disabled.yaml new file mode 100644 index 00000000000..62e244a8471 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_disabled.yaml @@ -0,0 +1,18 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: tag_records_disabled_dummy + tag: tag.records.disabled + dummy: '{"message":"disabled"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_enabled.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_enabled.yaml new file mode 100644 index 00000000000..4b97c71b4b2 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_enabled.yaml @@ -0,0 +1,23 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + +pipeline: + inputs: + - name: dummy + alias: tag_records_dummy + tag: tag.records.one + dummy: '{"message":"one"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_input_enable.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_input_enable.yaml new file mode 100644 index 00000000000..6df6782879f --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_input_enable.yaml @@ -0,0 +1,28 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: tag_records_enabled_input + tag: tag.records.on + telemetry: + metrics: + logs: + tag_records: + enabled: true + dummy: '{"message":"on"}' + samples: 1 + - name: dummy + alias: tag_records_default_input + tag: tag.records.off + dummy: '{"message":"off"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_disable.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_disable.yaml new file mode 100644 index 00000000000..ceeff0fc6e1 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_disable.yaml @@ -0,0 +1,33 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + +pipeline: + inputs: + - name: dummy + alias: tag_records_nested_off + tag: tag.records.nested.off + telemetry: + metrics: + logs: + tag_records: + enabled: false + dummy: '{"message":"nested off"}' + samples: 1 + - name: dummy + alias: tag_records_nested_control + tag: tag.records.nested.on + dummy: '{"message":"control"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_enable.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_enable.yaml new file mode 100644 index 00000000000..0a6b632bd86 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_input_nested_enable.yaml @@ -0,0 +1,28 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + +pipeline: + inputs: + - name: dummy + alias: tag_records_nested_enabled + tag: tag.records.nested.enable + telemetry: + metrics: + logs: + tag_records: + enabled: true + dummy: '{"message":"nested on"}' + samples: 1 + - name: dummy + alias: tag_records_nested_default + tag: tag.records.nested.default + dummy: '{"message":"default"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_input_override.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_input_override.yaml new file mode 100644 index 00000000000..2414d0359aa --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_input_override.yaml @@ -0,0 +1,28 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + +pipeline: + inputs: + - name: dummy + alias: tag_records_input_disabled + tag: tag.records.input.disabled + telemetry: + metrics: + logs: + tag_records: + enabled: false + dummy: '{"message":"input disabled"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_max_series.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_max_series.yaml new file mode 100644 index 00000000000..50340a4e0bc --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_max_series.yaml @@ -0,0 +1,29 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + max_series: 1 + +pipeline: + inputs: + - name: dummy + alias: tag_records_first + tag: tag.records.first + dummy: '{"message":"first"}' + samples: 1 + - name: dummy + alias: tag_records_second + tag: tag.records.second + dummy: '{"message":"second"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_max_tag_length.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_max_tag_length.yaml new file mode 100644 index 00000000000..862cd6b8a7b --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_max_tag_length.yaml @@ -0,0 +1,24 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + max_tag_length: 4 + +pipeline: + inputs: + - name: dummy + alias: tag_records_long_tag + tag: tag.records.too.long + dummy: '{"message":"long tag"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/config/tag_records_multi_tag.yaml b/tests/integration/scenarios/internal_metrics/config/tag_records_multi_tag.yaml new file mode 100644 index 00000000000..6db09eaaec0 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/config/tag_records_multi_tag.yaml @@ -0,0 +1,28 @@ +service: + flush: 1 + grace: 1 + log_level: info + http_server: on + http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT} + telemetry: + metrics: + logs: + tag_records: + enabled: true + +pipeline: + inputs: + - name: dummy + alias: tag_records_multi_a + tag: tag.records.alpha + dummy: '{"message":"alpha"}' + samples: 1 + - name: dummy + alias: tag_records_multi_b + tag: tag.records.beta + dummy: '{"message":"beta"}' + samples: 1 + + outputs: + - name: stdout + match: '*' diff --git a/tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records.py b/tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records.py new file mode 100644 index 00000000000..4339be6e4ae --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records.py @@ -0,0 +1,307 @@ +import os +import re + +import requests + +from utils.test_service import FluentBitTestService + + +METRIC_RE = re.compile( + r'^(?P[a-zA-Z_:][a-zA-Z0-9_:]*)\{(?P[^}]*)\}\s+(?P[-+0-9.eE]+)$' +) + + +class Service: + def __init__(self, config_file): + self.config_file = os.path.abspath( + os.path.join(os.path.dirname(__file__), "../config", config_file) + ) + self.service = FluentBitTestService(self.config_file) + + def start(self): + self.service.start() + self.flb = self.service.flb + + def stop(self): + self.service.stop() + + def metrics(self, expected=None): + url = f"http://127.0.0.1:{self.flb.http_monitoring_port}/api/v2/metrics/prometheus" + return self.service.wait_for_condition( + lambda: response.text + if ( + (response := requests.get(url, timeout=2)).status_code == 200 + and (expected is None or expected in response.text) + ) + else None, + timeout=10, + interval=0.5, + description="prometheus metrics", + ) + + +def _labels_to_dict(labels): + result = {} + for item in labels.split(","): + key, value = item.split("=", 1) + result[key] = value.strip('"') + return result + + +def _metric_value(metrics, metric_name, **labels): + for line in metrics.splitlines(): + match = METRIC_RE.match(line) + if not match or match.group("name") != metric_name: + continue + if _labels_to_dict(match.group("labels")) == labels: + return float(match.group("value")) + return None + + +def _metric_series(metrics, metric_name): + """Return a list of (labels_dict, value) for every series of metric_name.""" + series = [] + for line in metrics.splitlines(): + match = METRIC_RE.match(line) + if not match or match.group("name") != metric_name: + continue + series.append( + (_labels_to_dict(match.group("labels")), float(match.group("value"))) + ) + return series + + +def _run_until_value(config_file, metric_name, predicate, **labels): + """Start a service and wait until the metric value satisfies `predicate`.""" + service = Service(config_file) + try: + service.start() + url = ( + f"http://127.0.0.1:{service.flb.http_monitoring_port}" + "/api/v2/metrics/prometheus" + ) + + def check(): + response = requests.get(url, timeout=2) + if response.status_code != 200: + return None + value = _metric_value(response.text, metric_name, **labels) + if value is not None and predicate(value): + return response.text + return None + + return service.service.wait_for_condition( + check, + timeout=15, + interval=0.5, + description=f"{metric_name} value condition", + ) + finally: + service.stop() + + +def _run_with_metrics(config_file, expected=None): + service = Service(config_file) + try: + service.start() + return service.metrics(expected=expected) + finally: + service.stop() + + +def test_input_tag_records_enabled_by_service_yaml(): + metrics = _run_with_metrics( + "tag_records_enabled.yaml", + expected='fluentbit_input_logs_tag_records_total{name="tag_records_dummy"', + ) + + value = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_dummy", + tag="tag.records.one", + ) + assert value == 1 + + +def test_input_tag_records_disabled_by_default(): + metrics = _run_with_metrics("tag_records_disabled.yaml") + + assert "fluentbit_input_logs_tag_records_total" not in metrics + assert "fluentbit_input_logs_tag_records_untracked_total" not in metrics + + +def test_input_tag_records_respects_global_max_series(): + metrics = _run_with_metrics( + "tag_records_max_series.yaml", + expected="fluentbit_input_logs_tag_records_untracked_total", + ) + + # With max_series=1 and two inputs each producing one tag, exactly one + # (name, tag) series must be tracked and exactly one must be rejected with + # the "max_series" reason. Which input wins the single slot depends on + # ingestion ordering, so assert the invariant rather than a specific input. + tracked = _metric_series(metrics, "fluentbit_input_logs_tag_records_total") + assert len(tracked) == 1 + + rejected = [ + labels + for labels, value in _metric_series( + metrics, "fluentbit_input_logs_tag_records_untracked_total" + ) + if labels.get("reason") == "max_series" and value >= 1 + ] + assert len(rejected) == 1 + + # The rejected input must be the one that did NOT get the tracked slot. + tracked_name = tracked[0][0]["name"] + assert rejected[0]["name"] != tracked_name + + +def test_input_tag_records_respects_tag_length_limit(): + metrics = _run_with_metrics( + "tag_records_max_tag_length.yaml", + expected="fluentbit_input_logs_tag_records_untracked_total", + ) + + assert ( + _metric_value( + metrics, + "fluentbit_input_logs_tag_records_untracked_total", + name="tag_records_long_tag", + reason="tag_length_limit", + ) + == 1 + ) + assert "fluentbit_input_logs_tag_records_total" not in metrics + + +def test_input_tag_records_can_be_disabled_per_input(): + metrics = _run_with_metrics("tag_records_input_override.yaml") + + assert "fluentbit_input_logs_tag_records_total" not in metrics + assert "fluentbit_input_logs_tag_records_untracked_total" not in metrics + + +def test_input_tag_records_counter_accumulates(): + # The counter is monotonic; with a continuously emitting input the value + # must climb past the first record, proving records are added (not reset). + metrics = _run_until_value( + "tag_records_accumulate.yaml", + "fluentbit_input_logs_tag_records_total", + lambda value: value >= 2, + name="tag_records_accumulate", + tag="tag.records.acc", + ) + + value = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_accumulate", + tag="tag.records.acc", + ) + assert value >= 2 + + +def test_input_tag_records_tracks_multiple_tags(): + metrics = _run_with_metrics( + "tag_records_multi_tag.yaml", + expected='tag="tag.records.beta"', + ) + + alpha = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_multi_a", + tag="tag.records.alpha", + ) + beta = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_multi_b", + tag="tag.records.beta", + ) + + assert alpha is not None and alpha >= 1 + assert beta is not None and beta >= 1 + + # Two distinct (name, tag) series must be tracked independently. + series = _metric_series(metrics, "fluentbit_input_logs_tag_records_total") + tags = {labels["tag"] for labels, _ in series} + assert {"tag.records.alpha", "tag.records.beta"} <= tags + + # No series should have been rejected under the default limits. + assert "fluentbit_input_logs_tag_records_untracked_total" not in metrics + + +def test_input_tag_records_can_be_enabled_per_input_only(): + # Service-level telemetry is disabled (default). Only the input that opts + # in must export tag record metrics; the other input must export nothing. + metrics = _run_with_metrics( + "tag_records_input_enable.yaml", + expected='name="tag_records_enabled_input"', + ) + + enabled = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_enabled_input", + tag="tag.records.on", + ) + assert enabled is not None and enabled >= 1 + + # The input that did not opt in must not appear in any tag record series. + for labels, _ in _metric_series(metrics, "fluentbit_input_logs_tag_records_total"): + assert labels.get("name") != "tag_records_default_input" + for labels, _ in _metric_series( + metrics, "fluentbit_input_logs_tag_records_untracked_total" + ): + assert labels.get("name") != "tag_records_default_input" + + +def test_input_tag_records_nested_block_disables_input(): + # An input can override the service setting using the SAME nested block + # style as the service section (telemetry: -> metrics: -> tag_records:). + # Service is enabled; the nested block disables one input only. + metrics = _run_with_metrics( + "tag_records_input_nested_disable.yaml", + expected='name="tag_records_nested_control"', + ) + + control = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_nested_control", + tag="tag.records.nested.on", + ) + assert control is not None and control >= 1 + + # The nested-block-disabled input must not appear at all. + for labels, _ in _metric_series(metrics, "fluentbit_input_logs_tag_records_total"): + assert labels.get("name") != "tag_records_nested_off" + for labels, _ in _metric_series( + metrics, "fluentbit_input_logs_tag_records_untracked_total" + ): + assert labels.get("name") != "tag_records_nested_off" + + +def test_input_tag_records_nested_block_enables_input(): + # Service-level telemetry is off. An input enables it using the nested + # block form. This also guards against the previous regression where a + # nested block inside an input aborted startup. + metrics = _run_with_metrics( + "tag_records_input_nested_enable.yaml", + expected='name="tag_records_nested_enabled"', + ) + + enabled = _metric_value( + metrics, + "fluentbit_input_logs_tag_records_total", + name="tag_records_nested_enabled", + tag="tag.records.nested.enable", + ) + assert enabled is not None and enabled >= 1 + + # The input that did not opt in must not be tracked. + for labels, _ in _metric_series(metrics, "fluentbit_input_logs_tag_records_total"): + assert labels.get("name") != "tag_records_nested_default" diff --git a/tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records_badconfig.py b/tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records_badconfig.py new file mode 100644 index 00000000000..63f9704ef12 --- /dev/null +++ b/tests/integration/scenarios/internal_metrics/tests/test_internal_metrics_tag_records_badconfig.py @@ -0,0 +1,149 @@ +""" +Robustness tests for the logs tag-records telemetry configuration. + +A malformed configuration must never crash Fluent Bit (no SIGSEGV/SIGABRT/core +dump). It must either reject the config with a clean non-zero exit, or start +normally when the questionable value is leniently ignored. These tests run the +binary directly because the configs are not expected to reach a healthy state. +""" + +import os +import signal +import subprocess +import tempfile + +import pytest + +from utils.fluent_bit_manager import _default_binary_path +from utils.valgrind import assert_valgrind_clean + + +BINARY = os.environ.get("FLUENT_BIT_BINARY") or _default_binary_path() +CONFIG_DIR = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "config") +) + +# Honor the suite-wide valgrind switches. Because these tests run the binary +# directly (the configs are not expected to reach a healthy state), we wrap it +# with valgrind ourselves so VALGRIND runs also cover the bad-config paths. +VALGRIND = bool(os.environ.get("VALGRIND")) +VALGRIND_STRICT = bool(os.environ.get("VALGRIND_STRICT")) + +CRASH_SIGNALS = { + signal.SIGSEGV, + signal.SIGABRT, + signal.SIGILL, + signal.SIGFPE, + signal.SIGBUS, +} + +# Configs that must be rejected cleanly (non-zero exit, no crash). +REJECT_CONFIGS = [ + # invalid scalar values + "bad_tag_records_enabled_value.yaml", # enabled: notabool + "bad_tag_records_numeric_bool.yaml", # enabled: 2 (numeric bool not 0/1) + "bad_tag_records_type.yaml", # tag_records: [array] + "bad_tag_records_bool_form.yaml", # tag_records: true (must be a map) + "bad_tag_records_max_series_value.yaml", # max_series: notanumber (strict int) + # malformed / unknown shapes + "bad_tag_records_malformed_metrics.yaml", # metrics: true (not a map) + "bad_tag_records_unknown_option.yaml", # max_serie typo (tag_records key) + "bad_tag_records_unknown_parent_key.yaml", # tag_record typo (parent map key) + "bad_tag_records_nested_unknown.yaml", # unknown nested service block + "bad_tag_records_telemetry_scalar.yaml", # telemetry: oops (not a map) + # dotted-key form is not supported (must be a nested block) + "bad_tag_records_dotted_service.yaml", # service-level dotted key + "bad_tag_records_dotted_key.yaml", # input-level dotted key + # input-level constraints + "bad_tag_records_input_value.yaml", # input tag_records: maybe + "bad_tag_records_input_service_only.yaml", # max_series on an input +] + +# Valid-but-empty block: must start normally (feature disabled), not crash. +LENIENT_CONFIGS = [ + "lenient_tag_records_empty_block.yaml", +] + + +def _run(config_file, timeout): + path = os.path.join(CONFIG_DIR, config_file) + cmd = [BINARY, "-c", path] + vg_log = None + if VALGRIND: + vg_log = tempfile.NamedTemporaryFile( + prefix="vg_badcfg_", suffix=".log", delete=False + ).name + cmd = [ + "valgrind", + f"--log-file={vg_log}", + "--leak-check=full", + "--show-leak-kinds=all", + *cmd, + ] + timeout *= 6 # valgrind is much slower + + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + try: + out = proc.communicate(timeout=timeout)[0] + started = False # exited on its own + except subprocess.TimeoutExpired: + proc.terminate() + try: + out = proc.communicate(timeout=20)[0] + except subprocess.TimeoutExpired: + proc.kill() + out = proc.communicate()[0] + started = True # was still running (started OK) + return proc.returncode, out, started, vg_log + + +def _assert_not_crashed(returncode, output, started): + # When we stopped a running process ourselves it exits via SIGTERM; that is + # not a crash. Only a self-inflicted fatal signal counts as a crash. + if not started and returncode is not None and returncode < 0: + sig = -returncode + assert sig not in CRASH_SIGNALS, ( + f"Fluent Bit crashed with signal {sig}\n{output}" + ) + pytest.fail(f"Fluent Bit was killed by signal {sig}\n{output}") + assert "core dumped" not in output.lower() + assert "addresssanitizer" not in output.lower() + + +def _check_valgrind(vg_log): + if not vg_log: + return + try: + if VALGRIND_STRICT: + assert_valgrind_clean(vg_log) + finally: + try: + os.unlink(vg_log) + except OSError: + pass + + +@pytest.mark.parametrize("config_file", REJECT_CONFIGS) +def test_bad_config_is_rejected_without_crash(config_file): + returncode, output, started, vg_log = _run(config_file, timeout=15) + _assert_not_crashed(returncode, output, started) + assert not started, f"{config_file} unexpectedly started instead of failing" + # Under valgrind the wrapper relays the child's exit code; a clean reject is + # still a non-zero exit. + assert returncode is not None and returncode != 0, ( + f"{config_file} should exit non-zero, got {returncode}\n{output}" + ) + _check_valgrind(vg_log) + + +@pytest.mark.parametrize("config_file", LENIENT_CONFIGS) +def test_lenient_config_starts_without_crash(config_file): + returncode, output, started, vg_log = _run(config_file, timeout=6) + _assert_not_crashed(returncode, output, started) + assert started, ( + f"{config_file} should start (value ignored), but exited " + f"{returncode}\n{output}" + ) + _check_valgrind(vg_log)