diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index ccc611112d6..d390cb98d6a 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -943,6 +943,54 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi return -1; } + /* + * Create upstream context for Kusto Cluster endpoint (for streaming ingestion) + * Convert ingestion endpoint to cluster endpoint by removing "ingest-" prefix + */ + if (ctx->streaming_ingestion_enabled == FLB_TRUE) { + flb_sds_t cluster_endpoint = NULL; + + /* Check if ingestion endpoint contains "ingest-" prefix */ + if (strstr(ctx->ingestion_endpoint, "ingest-") != NULL) { + /* Create cluster endpoint by removing "ingest-" prefix */ + cluster_endpoint = flb_sds_create(ctx->ingestion_endpoint); + if (!cluster_endpoint) { + flb_plg_error(ctx->ins, "failed to create cluster endpoint string"); + return -1; + } + + /* Replace "ingest-" with empty string to get cluster endpoint */ + char *ingest_pos = strstr(cluster_endpoint, "ingest-"); + if (ingest_pos) { + /* Move the rest of the string to remove "ingest-" */ + memmove(ingest_pos, ingest_pos + 7, strlen(ingest_pos + 7) + 1); + flb_sds_len_set(cluster_endpoint, flb_sds_len(cluster_endpoint) - 7); + } + + flb_plg_info(ctx->ins, "Creating cluster upstream connection to: %s", cluster_endpoint); + + /* Create upstream connection to cluster endpoint */ + ctx->u_cluster = flb_upstream_create_url(config, cluster_endpoint, io_flags, ins->tls); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "cluster upstream creation failed for endpoint: %s", cluster_endpoint); + flb_sds_destroy(cluster_endpoint); + return -1; + } + + flb_sds_destroy(cluster_endpoint); + } else { + flb_plg_warn(ctx->ins, "ingestion endpoint does not contain 'ingest-' prefix, using as cluster endpoint"); + /* Use ingestion endpoint directly as cluster endpoint */ + ctx->u_cluster = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "cluster upstream creation failed"); + return -1; + } + } + + flb_plg_info(ctx->ins, "Cluster upstream connection created successfully for streaming ingestion"); + } + flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); /* Create oauth2 context */ @@ -1396,22 +1444,50 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk, } flb_plg_trace(ctx->ins, "payload size after compression %zu", final_payload_size); - /* Load or refresh ingestion resources */ - ret = azure_kusto_load_ingestion_resources(ctx, config); - flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot load ingestion resources"); - ret = FLB_RETRY; - goto error; - } + /* Check if streaming ingestion is enabled */ + if (ctx->streaming_ingestion_enabled == FLB_TRUE) { + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion mode enabled for tag: %s", event_chunk->tag); - /* Perform queued ingestion to Kusto */ - ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); - flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); - if (ret != 0) { - flb_plg_error(ctx->ins, "cannot perform queued ingestion"); - ret = FLB_RETRY; - goto error; + /* Check payload size limit for streaming ingestion (4MB) */ + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Checking payload size: %zu bytes against 4MB limit", final_payload_size); + if (final_payload_size > 4194304) { /* 4MB = 4 * 1024 * 1024 */ + flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Payload size %zu bytes exceeds 4MB limit for streaming ingestion", final_payload_size); + ret = FLB_ERROR; + goto error; + } + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Payload size check passed (%zu bytes < 4MB)", final_payload_size); + + /* Perform streaming ingestion to Kusto */ + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Initiating streaming ingestion to Kusto"); + ret = azure_kusto_streaming_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size); + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] Streaming ingestion completed with result: %d", ret); + + if (ret != 0) { + flb_plg_error(ctx->ins, "[FLUSH_STREAMING] ERROR: Streaming ingestion failed, will retry"); + ret = FLB_RETRY; + goto error; + } else { + flb_plg_info(ctx->ins, "[FLUSH_STREAMING] SUCCESS: Streaming ingestion completed successfully"); + } + } else { + flb_plg_debug(ctx->ins, "[FLUSH_QUEUED] Using queued ingestion mode (streaming ingestion disabled)"); + /* Load or refresh ingestion resources for queued ingestion */ + ret = azure_kusto_load_ingestion_resources(ctx, config); + flb_plg_trace(ctx->ins, "load_ingestion_resources: ret=%d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot load ingestion resources"); + ret = FLB_RETRY; + goto error; + } + + /* Perform queued ingestion to Kusto */ + ret = azure_kusto_queued_ingestion(ctx, event_chunk->tag, tag_len, final_payload, final_payload_size, NULL); + flb_plg_trace(ctx->ins, "after kusto queued ingestion %d", ret); + if (ret != 0) { + flb_plg_error(ctx->ins, "cannot perform queued ingestion"); + ret = FLB_RETRY; + goto error; + } } ret = FLB_OK; @@ -1501,6 +1577,11 @@ static int cb_azure_kusto_exit(void *data, struct flb_config *config) ctx->u = NULL; } + if (ctx->u_cluster) { + flb_upstream_destroy(ctx->u_cluster); + ctx->u_cluster = NULL; + } + pthread_mutex_destroy(&ctx->resources_mutex); pthread_mutex_destroy(&ctx->token_mutex); pthread_mutex_destroy(&ctx->blob_mutex); @@ -1565,6 +1646,11 @@ static struct flb_config_map config_map[] = { offsetof(struct flb_azure_kusto, compression_enabled), "Enable HTTP payload compression (gzip)." "The default is true."}, + {FLB_CONFIG_MAP_BOOL, "streaming_ingestion_enabled", "false", 0, FLB_TRUE, + offsetof(struct flb_azure_kusto, streaming_ingestion_enabled), + "Enable streaming ingestion. When enabled, data is sent directly to Kusto engine without using blob storage and queues. " + "Note: Streaming ingestion has a 4MB limit per request and doesn't support buffering." + "The default is false (uses queued ingestion)."}, {FLB_CONFIG_MAP_TIME, "ingestion_resources_refresh_interval", FLB_AZURE_KUSTO_RESOURCES_LOAD_INTERVAL_SEC,0, FLB_TRUE, offsetof(struct flb_azure_kusto, ingestion_resources_refresh_interval), "Set the azure kusto ingestion resources refresh interval" diff --git a/plugins/out_azure_kusto/azure_kusto.h b/plugins/out_azure_kusto/azure_kusto.h index 362b1379533..6b53eb23c4f 100644 --- a/plugins/out_azure_kusto/azure_kusto.h +++ b/plugins/out_azure_kusto/azure_kusto.h @@ -108,6 +108,9 @@ struct flb_azure_kusto { /* compress payload */ int compression_enabled; + /* streaming ingestion mode */ + int streaming_ingestion_enabled; + int ingestion_resources_refresh_interval; /* records configuration */ @@ -167,6 +170,9 @@ struct flb_azure_kusto { /* Upstream connection to the backend server */ struct flb_upstream *u; + /* Upstream connection to the main Kusto cluster for streaming ingestion */ + struct flb_upstream *u_cluster; + struct flb_upstream *imds_upstream; /* Fluent Bit context */ @@ -179,4 +185,4 @@ struct flb_azure_kusto { flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx); flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl); -#endif \ No newline at end of file +#endif diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index fa899eab686..ac471b7c3ec 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -796,6 +796,24 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance * return NULL; } + /* Validate mutual exclusivity between buffering and streaming ingestion */ + if (ctx->buffering_enabled && ctx->streaming_ingestion_enabled) { + flb_plg_error(ctx->ins, "buffering_enabled and streaming_ingestion_enabled cannot both be true. When buffering is enabled, streaming ingestion is automatically disabled"); + flb_azure_kusto_conf_destroy(ctx); + return NULL; + } + + /* Log ingestion mode selection */ + if (ctx->streaming_ingestion_enabled) { + flb_plg_info(ctx->ins, "streaming ingestion mode enabled - data will be sent directly to Kusto engine (4MB payload limit per request, no local buffering support)"); + } else { + if (ctx->buffering_enabled) { + flb_plg_info(ctx->ins, "queued ingestion mode enabled with local file buffering - data will be sent via blob storage and ingestion queues"); + } else { + flb_plg_info(ctx->ins, "queued ingestion mode enabled - data will be sent via blob storage and ingestion queues"); + } + } + /* Create oauth2 context */ if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM || ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) { diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.c b/plugins/out_azure_kusto/azure_kusto_ingest.c index 38d2aa076e5..656d427598e 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.c +++ b/plugins/out_azure_kusto/azure_kusto_ingest.c @@ -517,6 +517,147 @@ static int azure_kusto_enqueue_ingestion(struct flb_azure_kusto *ctx, flb_sds_t return ret; } +int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, + size_t tag_len, flb_sds_t payload, size_t payload_size) +{ + int ret = -1; + struct flb_connection *u_conn; + struct flb_http_client *c; + flb_sds_t uri = NULL; + flb_sds_t token = NULL; + size_t resp_size; + time_t now; + struct tm tm; + char tmp[64]; + int len; + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Starting for tag: %.*s, payload: %zu bytes, db: %s, table: %s, compression: %s", + (int)tag_len, tag, payload_size, ctx->database_name, ctx->table_name, ctx->compression_enabled ? "enabled" : "disabled"); + + now = time(NULL); + gmtime_r(&now, &tm); + len = strftime(tmp, sizeof(tmp) - 1, "%a, %d %b %Y %H:%M:%S GMT", &tm); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Request timestamp: %s", tmp); + + /* Get upstream connection to the main Kusto cluster endpoint (for streaming ingestion) */ + if (!ctx->u_cluster) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: cluster upstream not available - streaming ingestion requires cluster endpoint"); + return -1; + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Getting upstream connection to cluster endpoint"); + u_conn = flb_upstream_conn_get(ctx->u_cluster); + if (!u_conn) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to get cluster upstream connection"); + return -1; + } + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained upstream connection"); + + /* Get authentication token */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Retrieving OAuth2 authentication token"); + token = get_azure_kusto_token(ctx); + if (!token) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to retrieve OAuth2 token"); + flb_upstream_conn_release(u_conn); + return -1; + } + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Successfully obtained OAuth2 token (length: %zu)", flb_sds_len(token)); + + /* Build the streaming ingestion URI */ + uri = flb_sds_create_size(256); + if (!uri) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create URI buffer"); + flb_sds_destroy(token); + flb_upstream_conn_release(u_conn); + return -1; + } + + /* Create the streaming ingestion URI */ + if (ctx->ingestion_mapping_reference) { + flb_sds_snprintf(&uri, flb_sds_alloc(uri), + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON&mappingName=%s", + ctx->database_name, ctx->table_name, ctx->ingestion_mapping_reference); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Using ingestion mapping: %s", ctx->ingestion_mapping_reference); + } else { + flb_sds_snprintf(&uri, flb_sds_alloc(uri), + "/v1/rest/ingest/%s/%s?streamFormat=MultiJSON", + ctx->database_name, ctx->table_name); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] No ingestion mapping specified"); + } + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Request URI: %s", uri); + + /* Create HTTP client for streaming ingestion */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Creating HTTP client for POST request"); + c = flb_http_client(u_conn, FLB_HTTP_POST, uri, payload, payload_size, + NULL, 0, NULL, 0); + + if (c) { + /* Add required headers for streaming ingestion */ + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Accept", 6, "application/json", 16); + flb_http_add_header(c, "Authorization", 13, token, flb_sds_len(token)); + flb_http_add_header(c, "x-ms-date", 9, tmp, len); + flb_http_add_header(c, "x-ms-client-version", 19, FLB_VERSION_STR, strlen(FLB_VERSION_STR)); + flb_http_add_header(c, "x-ms-app", 8, "Kusto.Fluent-Bit", 16); + flb_http_add_header(c, "x-ms-user", 9, "Kusto.Fluent-Bit", 16); + + /* Set Content-Type based on whether compression is enabled */ + if (ctx->compression_enabled) { + flb_http_add_header(c, "Content-Type", 12, "application/json", 16); + flb_http_add_header(c, "Content-Encoding", 16, "gzip", 4); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for compressed payload"); + } else { + flb_http_add_header(c, "Content-Type", 12, "application/json; charset=utf-8", 31); + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Headers set for uncompressed payload"); + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Payload sample (first 200 chars): %.200s", (char*)payload); + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Sending HTTP POST request to Kusto engine"); + + /* Send the HTTP request */ + ret = flb_http_do(c, &resp_size); + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] HTTP request completed - http_do result: %d, HTTP Status: %i, Response size: %zu", ret, c->resp.status, resp_size); + + if (ret == 0) { + /* Check for successful HTTP status codes */ + if (c->resp.status == 200 || c->resp.status == 204) { + ret = 0; + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] SUCCESS: Data successfully ingested to Kusto (HTTP %d)", c->resp.status); + } else { + ret = -1; + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed with status: %i", c->resp.status); + + if (c->resp.payload_size > 0) { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] Error response body (size: %zu): %.*s", + c->resp.payload_size, (int)c->resp.payload_size, c->resp.payload); + } + } + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: HTTP request failed at transport level (ret=%d)", ret); + } + + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Destroying HTTP client"); + flb_http_client_destroy(c); + } else { + flb_plg_error(ctx->ins, "[STREAMING_INGESTION] ERROR: failed to create HTTP client context"); + } + + /* Cleanup */ + flb_plg_debug(ctx->ins, "[STREAMING_INGESTION] Cleaning up resources"); + if (uri) { + flb_sds_destroy(uri); + } + if (token) { + flb_sds_destroy(token); + } + flb_upstream_conn_release(u_conn); + + flb_plg_info(ctx->ins, "[STREAMING_INGESTION] Streaming ingestion completed with result: %d", ret); + return ret; +} + + /* Function to generate a random alphanumeric string */ void generate_random_string(char *str, size_t length) { @@ -658,4 +799,4 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, } return ret; -} \ No newline at end of file +} diff --git a/plugins/out_azure_kusto/azure_kusto_ingest.h b/plugins/out_azure_kusto/azure_kusto_ingest.h index b60796f4fd1..80a95105937 100644 --- a/plugins/out_azure_kusto/azure_kusto_ingest.h +++ b/plugins/out_azure_kusto/azure_kusto_ingest.h @@ -26,4 +26,7 @@ int azure_kusto_queued_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, size_t tag_len, flb_sds_t payload, size_t payload_size, struct azure_kusto_file *upload_file); -#endif \ No newline at end of file +int azure_kusto_streaming_ingestion(struct flb_azure_kusto *ctx, flb_sds_t tag, + size_t tag_len, flb_sds_t payload, size_t payload_size); + +#endif