-
Notifications
You must be signed in to change notification settings - Fork 1.8k
out_azure_blob: add log_key option #9791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@edsiper Can you please give us an update? |
memory leak test after rewrite:
|
Hi @tomekwilk @lockewritesdocs, |
Signed-off-by: Tomasz Wilk <[email protected]>
WalkthroughAdds optional log_key-based extraction from msgpack to send a single field value instead of full records, updates formatting flow and function signature to route through this option, introduces config map entries and struct field for log_key, and includes record accessor headers. Default path remains JSON lines when log_key is unset. Changes
Sequence Diagram(s)sequenceDiagram
participant In as Input
participant AZB as AzureBlob Plugin
participant Fmt as Formatter
participant AZ as Azure Blob Service
In->>AZB: Flush event (msgpack, tag, bytes)
AZB->>Fmt: azure_blob_format(config, ins, ctx, flush_ctx, event_type, tag, data)
alt log_key configured
Fmt->>Fmt: Extract field via record accessor
note right of Fmt: Convert to string/number<br/>Error if missing/unsupported
else
Fmt->>Fmt: Format record(s) as JSON lines
end
Fmt-->>AZB: out_data, out_size
AZB->>AZ: Upload formatted payload
AZ-->>AZB: Response
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🧪 Early access (Sonnet 4.5): enabledWe are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience. Note:
Comment |
I rebased the PR to resolve the merge conflicts after recent master changes. This PR is waiting to be re-reviewed and merged. Not sure if there is anything else for me to do. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
plugins/out_azure_blob/azure_blob.c (2)
34-35
: Good: using Record Accessor APIs.Including flb_record_accessor.h and flb_ra_key.h aligns with prior guidance to avoid manual map walking.
70-75
: Call flb_errno() before flb_plg_error() on RA creation failure.Swap the calls so errno is captured before logging.
Apply:
- if (!ra) { - flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'", ctx->log_key); - flb_errno(); - return NULL; - } + if (!ra) { + flb_errno(); + flb_plg_error(ctx->ins, "invalid record accessor pattern '%s'", ctx->log_key); + return NULL; + }
🧹 Nitpick comments (2)
plugins/out_azure_blob/azure_blob.c (2)
177-186
: Safer behavior: fallback to JSON when extraction yields no output.Avoid dropping data if log_key is missing/unsupported; gracefully fall back.
Apply:
- if (ctx->log_key) { - out_buf = cb_azb_msgpack_extract_log_key(ctx, data, bytes); - } - else { + if (ctx->log_key) { + out_buf = cb_azb_msgpack_extract_log_key(ctx, data, bytes); + if (!out_buf) { + flb_plg_warn(ctx->ins, "log_key='%s' yielded no data; falling back to JSON lines", ctx->log_key); + } + } + if (!out_buf) { out_buf = flb_pack_msgpack_to_json_format(data, bytes, FLB_PACK_JSON_FORMAT_LINES, FLB_PACK_JSON_DATE_ISO8601, ctx->date_key, config->json_escape_unicode); }
1897-1904
: Clarify that log_key uses Record Accessor syntax.Config text says “key name,” but code uses record accessor. Recommend noting RA path examples (e.g.,
log
,kubernetes['labels']['app']
) to set user expectations. Also document newline-delimited output when multiple records are present.I can update the docs snippet accordingly if desired.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
plugins/out_azure_blob/azure_blob.c
(6 hunks)plugins/out_azure_blob/azure_blob.h
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/out_azure_blob/azure_blob.c (4)
src/flb_record_accessor.c (3)
flb_ra_create
(271-358)flb_ra_get_value_object
(803-814)flb_ra_destroy
(232-248)src/flb_sds.c (4)
flb_sds_create_size
(92-95)flb_sds_copy
(260-281)flb_sds_cat
(120-141)flb_sds_printf
(336-387)src/flb_ra_key.c (1)
flb_ra_key_value_destroy
(842-851)src/flb_pack.c (1)
flb_pack_msgpack_to_json_format
(1169-1450)
/* Unpack the data */ | ||
msgpack_unpacked_init(&result); | ||
while (1) { | ||
msgpack_unpack_return ret = msgpack_unpack_next(&result, data, bytes, &off); | ||
if (ret == MSGPACK_UNPACK_SUCCESS) { | ||
root = result.data; | ||
if (root.type != MSGPACK_OBJECT_ARRAY) { | ||
continue; | ||
} | ||
|
||
if (root.via.array.size < 2) { | ||
flb_plg_debug(ctx->ins, "msgpack array has insufficient elements"); | ||
continue; | ||
} | ||
|
||
map = root.via.array.ptr[1]; | ||
|
||
/* Get value using record accessor */ | ||
rval = flb_ra_get_value_object(ra, map); | ||
if (!rval) { | ||
flb_plg_error(ctx->ins, "could not find field '%s'", ctx->log_key); | ||
continue; | ||
} | ||
|
||
/* Convert value based on its type */ | ||
if (rval->type == FLB_RA_STRING) { | ||
out_buf = flb_sds_create_size(rval->o.via.str.size + 1); | ||
if (out_buf) { | ||
flb_sds_copy(out_buf, rval->o.via.str.ptr, rval->o.via.str.size); | ||
flb_sds_cat(out_buf, "\n", 1); | ||
} | ||
} | ||
else if (rval->type == FLB_RA_FLOAT) { | ||
out_buf = flb_sds_create_size(64); | ||
if (out_buf) { | ||
flb_sds_printf(&out_buf, "%f\n", rval->val.f64); | ||
} | ||
} | ||
else if (rval->type == FLB_RA_INT) { | ||
out_buf = flb_sds_create_size(64); | ||
if (out_buf) { | ||
flb_sds_printf(&out_buf, "%" PRId64 "\n", rval->val.i64); | ||
} | ||
} | ||
else { | ||
flb_errno(); | ||
flb_plg_error(ctx->ins, "cannot convert given value for field '%s'", ctx->log_key); | ||
flb_ra_key_value_destroy(rval); | ||
rval = NULL; | ||
break; | ||
} | ||
|
||
/* Check if buffer allocation succeeded */ | ||
if (!out_buf) { | ||
flb_errno(); | ||
flb_plg_error(ctx->ins, "could not allocate output buffer"); | ||
} | ||
|
||
flb_ra_key_value_destroy(rval); | ||
rval = NULL; | ||
|
||
/* Successfully found and processed log_key, exit loop */ | ||
break; | ||
} | ||
else if (ret == MSGPACK_UNPACK_CONTINUE) { | ||
/* Buffer exhausted or truncated data, stop processing */ | ||
flb_plg_debug(ctx->ins, "msgpack unpack needs more data or data truncated"); | ||
break; | ||
} | ||
else if (ret == MSGPACK_UNPACK_PARSE_ERROR) { | ||
flb_errno(); | ||
flb_plg_error(ctx->ins, "msgpack parse error"); | ||
break; | ||
} | ||
else { | ||
flb_errno(); | ||
flb_plg_error(ctx->ins, "unexpected msgpack unpack return code %d", ret); | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data loss: only the first record is extracted; iterate the entire chunk.
The loop breaks after the first successful extraction, so multi-record chunks upload a single line. Aggregate all matching values into newline-delimited payloads.
Apply this refactor using the log event decoder to iterate records:
- /* Unpack the data */
- msgpack_unpacked_init(&result);
- while (1) {
- msgpack_unpack_return ret = msgpack_unpack_next(&result, data, bytes, &off);
- if (ret == MSGPACK_UNPACK_SUCCESS) {
- root = result.data;
- if (root.type != MSGPACK_OBJECT_ARRAY) {
- continue;
- }
- if (root.via.array.size < 2) {
- flb_plg_debug(ctx->ins, "msgpack array has insufficient elements");
- continue;
- }
-
- map = root.via.array.ptr[1];
-
- /* Get value using record accessor */
- rval = flb_ra_get_value_object(ra, map);
- if (!rval) {
- flb_plg_error(ctx->ins, "could not find field '%s'", ctx->log_key);
- continue;
- }
-
- /* Convert value based on its type */
- if (rval->type == FLB_RA_STRING) {
- out_buf = flb_sds_create_size(rval->o.via.str.size + 1);
- if (out_buf) {
- flb_sds_copy(out_buf, rval->o.via.str.ptr, rval->o.via.str.size);
- flb_sds_cat(out_buf, "\n", 1);
- }
- }
- else if (rval->type == FLB_RA_FLOAT) {
- out_buf = flb_sds_create_size(64);
- if (out_buf) {
- flb_sds_printf(&out_buf, "%f\n", rval->val.f64);
- }
- }
- else if (rval->type == FLB_RA_INT) {
- out_buf = flb_sds_create_size(64);
- if (out_buf) {
- flb_sds_printf(&out_buf, "%" PRId64 "\n", rval->val.i64);
- }
- }
- else {
- flb_errno();
- flb_plg_error(ctx->ins, "cannot convert given value for field '%s'", ctx->log_key);
- flb_ra_key_value_destroy(rval);
- rval = NULL;
- break;
- }
-
- /* Check if buffer allocation succeeded */
- if (!out_buf) {
- flb_errno();
- flb_plg_error(ctx->ins, "could not allocate output buffer");
- }
-
- flb_ra_key_value_destroy(rval);
- rval = NULL;
-
- /* Successfully found and processed log_key, exit loop */
- break;
- }
- else if (ret == MSGPACK_UNPACK_CONTINUE) {
- /* Buffer exhausted or truncated data, stop processing */
- flb_plg_debug(ctx->ins, "msgpack unpack needs more data or data truncated");
- break;
- }
- else if (ret == MSGPACK_UNPACK_PARSE_ERROR) {
- flb_errno();
- flb_plg_error(ctx->ins, "msgpack parse error");
- break;
- }
- else {
- flb_errno();
- flb_plg_error(ctx->ins, "unexpected msgpack unpack return code %d", ret);
- break;
- }
- }
+ /* Decode all events and aggregate values */
+ {
+ struct flb_log_event_decoder log_decoder;
+ struct flb_log_event log_event;
+ int dret;
+
+ dret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
+ if (dret != FLB_EVENT_DECODER_SUCCESS) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "log event decoder init error: %d", dret);
+ }
+ else {
+ while (flb_log_event_decoder_next(&log_decoder, &log_event) == FLB_EVENT_DECODER_SUCCESS) {
+ map = *log_event.body;
+
+ rval = flb_ra_get_value_object(ra, map);
+ if (!rval) {
+ continue; /* field missing on this record */
+ }
+
+ if (rval->type == FLB_RA_STRING) {
+ /* allocate lazily */
+ if (!out_buf) {
+ out_buf = flb_sds_create_size(rval->o.via.str.size + 2);
+ if (!out_buf) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "could not allocate output buffer");
+ flb_ra_key_value_destroy(rval);
+ break;
+ }
+ }
+ out_buf = flb_sds_cat(out_buf, rval->o.via.str.ptr, rval->o.via.str.size);
+ out_buf = flb_sds_cat(out_buf, "\n", 1);
+ }
+ else if (rval->type == FLB_RA_FLOAT) {
+ if (!out_buf) {
+ out_buf = flb_sds_create_size(64);
+ if (!out_buf) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "could not allocate output buffer");
+ flb_ra_key_value_destroy(rval);
+ break;
+ }
+ }
+ if (!flb_sds_printf(&out_buf, "%f\n", rval->val.f64)) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "could not append float value");
+ flb_ra_key_value_destroy(rval);
+ break;
+ }
+ }
+ else if (rval->type == FLB_RA_INT) {
+ if (!out_buf) {
+ out_buf = flb_sds_create_size(64);
+ if (!out_buf) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "could not allocate output buffer");
+ flb_ra_key_value_destroy(rval);
+ break;
+ }
+ }
+ if (!flb_sds_printf(&out_buf, "%" PRId64 "\n", rval->val.i64)) {
+ flb_errno();
+ flb_plg_error(ctx->ins, "could not append int value");
+ flb_ra_key_value_destroy(rval);
+ break;
+ }
+ }
+ else {
+ /* skip unsupported types instead of failing the whole chunk */
+ flb_plg_debug(ctx->ins, "unsupported type for field '%s', skipping", ctx->log_key);
+ }
+
+ flb_ra_key_value_destroy(rval);
+ rval = NULL;
+ }
+
+ flb_log_event_decoder_destroy(&log_decoder);
+ }
+ }
Optional: cache the compiled RA at init time (ctx->ra_log_key) and reuse here to avoid per-chunk creation.
flb_sds_t account_name; | ||
flb_sds_t container_name; | ||
flb_sds_t blob_type; | ||
flb_sds_t log_key; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Add corresponding destroy for ctx->log_key in conf teardown.
Looks good. Please ensure azure_blob_conf_destroy() frees ctx->log_key to avoid leaks. If not present, add it.
Run to verify:
🏁 Script executed:
#!/bin/bash
# confirm create/destroy cover log_key
rg -nP 'flb_azure_blob_conf_(create|destroy)\s*\(' -C3
rg -nP '\blog_key\b' -C2 plugins/out_azure_blob
Length of output: 5196
🏁 Script executed:
sed -n '785,840p' plugins/out_azure_blob/azure_blob_conf.c
Length of output: 1140
Free ctx->log_key
in flb_azure_blob_conf_destroy
Add
if (ctx->log_key) {
flb_sds_destroy(ctx->log_key);
ctx->log_key = NULL;
}
to the end of flb_azure_blob_conf_destroy()
(before flb_free(ctx);
) to avoid leaking the SDS field.
🤖 Prompt for AI Agents
In plugins/out_azure_blob/azure_blob.h around line 62, the struct field
flb_sds_t log_key is allocated but not freed in flb_azure_blob_conf_destroy;
update flb_azure_blob_conf_destroy to check if ctx->log_key is non-NULL, call
flb_sds_destroy(ctx->log_key), and set ctx->log_key to NULL before freeing ctx
to prevent an SDS memory leak.
Hello @edsiper , @adrinaula , This PR tackles an issue that we've also recently faced. Would be interested to contribute if need be :) . Thanks in Advance, |
This PR is based on PR #3668 but addresses Azure blob storage. The azure_blob plugin was modify to accept 'log_key' option. By default the entire log record is sent to storage. When 'log_key' option is specified in the output plugin configuration, then only the value of the key is sent to the storage blob.
Addresses #9721
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
Documentation
Doc PR fluent/fluent-bit-docs#1540
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
By default the entire record is sent to azure blob storage. Here is an example of a sample configuration and default output
Configuration
Record without log_key
{"@timestamp":"2025-01-02T16:56:02.906357Z","name":"Fluent Bit","year":2020}
if the 'log_key' is specified then only the specific key value is sent to azure blob storage
Sample configuration with log_key
Record with log_key set to name
Fluent Bit
Example Valgrind output
Addresses #9721
Summary by CodeRabbit