From 9adde9ab632698988f237c7fa040264be9e8eed1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 10 Jul 2025 11:40:49 -0700 Subject: [PATCH 01/13] feat: add api for preparing demo data API GET /api/v1/demodata?action=ingest to ingest 10k records from the script GET /api/v1/demodata?action=filters to create saved sqls and saved filters from the script --- resources/filters_demo_data.sh | 280 +++++++++++++++++ resources/ingest_demo_data.sh | 377 +++++++++++++++++++++++ src/handlers/http/cluster/mod.rs | 58 +++- src/handlers/http/demo_data.rs | 136 ++++++++ src/handlers/http/ingest.rs | 6 + src/handlers/http/mod.rs | 1 + src/handlers/http/modal/ingest_server.rs | 3 +- src/handlers/http/modal/query_server.rs | 3 +- src/handlers/http/modal/server.rs | 8 +- 9 files changed, 868 insertions(+), 4 deletions(-) create mode 100755 resources/filters_demo_data.sh create mode 100755 resources/ingest_demo_data.sh create mode 100644 src/handlers/http/demo_data.rs diff --git a/resources/filters_demo_data.sh b/resources/filters_demo_data.sh new file mode 100755 index 000000000..594703d82 --- /dev/null +++ b/resources/filters_demo_data.sh @@ -0,0 +1,280 @@ +#!/usr/bin/env bash + +# Configuration with validation +P_URL=${P_URL:-"http://localhost:8000"} +P_USERNAME=${P_USERNAME:-"admin"} +P_PASSWORD=${P_PASSWORD:-"admin"} +P_STREAM=${P_STREAM:-"demodata"} + +# Silent mode handling +SILENT=${SILENT:-false} +for arg in "$@"; do + case $arg in + --silent) + SILENT=true + shift + ;; + -h|--help) + echo "Usage: $0 [--silent]" + echo " --silent Run in silent mode" + exit 0 + ;; + esac +done + +# Only show config if not silent +if [[ "$SILENT" != "true" ]]; then + echo "Configuration:" + echo " URL: $P_URL" + echo " Username: $P_USERNAME" + echo " Stream: $P_STREAM" + echo +fi + +# Pre-compute auth header +AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" + +# Common curl function with retry logic +curl_with_retry() { + local url="$1" + local method="$2" + local data="$3" + local content_type="${4:-application/json}" + local max_retries="${5:-3}" + local base_timeout="${6:-15}" + local retry_count=0 + + # Set timeout based on retry attempt: 10s, 20s, 30s + local max_time=$((10 + (retry_count * 10))) + local connect_timeout=5 + + # Create temp file if data is provided + if [[ -n "$data" ]]; then + temp_file=$(mktemp) + if [[ $? -ne 0 ]]; then + print_error "Failed to create temporary file" + return 1 + fi + echo "$data" > "$temp_file" + fi + + while [[ $retry_count -lt $max_retries ]]; do + # Current timeout: 10s, 20s, 30s for attempts 1, 2, 3 + max_time=$((10 + (retry_count * 10))) + + local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" + + # Add headers + curl_cmd+=" -H \"Content-Type: $content_type\"" + curl_cmd+=" -H \"$AUTH_HEADER\"" + + # Add stream header for ingestion requests + if [[ "$url" == *"/ingest"* ]]; then + curl_cmd+=" -H \"X-P-STREAM: $P_STREAM\"" + fi + + # Add method and data + if [[ "$method" == "POST" ]]; then + curl_cmd+=" -X POST" + if [[ -n "$temp_file" ]]; then + curl_cmd+=" --data-binary \"@$temp_file\"" + elif [[ -n "$data" ]]; then + curl_cmd+=" -d \"$data\"" + fi + fi + + # Add URL + curl_cmd+=" \"$url\"" + + # Execute curl + local response + response=$(eval "$curl_cmd" 2>&1) + local curl_exit_code=$? + + # Check curl exit code + if [[ $curl_exit_code -eq 0 ]]; then + # Success - extract status code and return + local status_code + if [[ -n "$response" ]]; then + status_code=$(echo "$response" | tail -n1) + local response_body=$(echo "$response" | sed '$d') + + # Clean up temp file + [[ -n "$temp_file" ]] && rm -f "$temp_file" + + if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then + return 0 + else + print_error "HTTP $status_code: $response_body" + return 1 + fi + else + print_error "No response from server" + return 1 + fi + elif [[ $curl_exit_code -eq 28 ]]; then + # Timeout - retry immediately with next timeout level + retry_count=$((retry_count + 1)) + + if [[ "$SILENT" != "true" && -n "$data" ]]; then + echo "Timeout (${#data} chars) - retry $retry_count with $((10 + (retry_count * 10)))s timeout" + elif [[ "$SILENT" != "true" ]]; then + echo "Timeout - retry $retry_count with $((10 + (retry_count * 10)))s timeout" + fi + + # Brief pause before retry + sleep 1 + else + # Other error - break and report + break + fi + done + + # Clean up temp file on failure + [[ -n "$temp_file" ]] && rm -f "$temp_file" + + # Final error reporting + print_error "curl failed with exit code $curl_exit_code after $retry_count retries" + if [[ -n "$data" ]]; then + print_error "Data size: ${#data} characters, Final timeout: ${max_time}s" + fi + [[ "$SILENT" != "true" ]] && print_error "Response: $response" + + return 1 +} + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +BLUE='\033[0;34m' +NC='\033[0m' + +print_info() { [[ "$SILENT" != "true" ]] && echo -e "${BLUE}[INFO]${NC} $1"; } +print_success() { [[ "$SILENT" != "true" ]] && echo -e "${GREEN}[SUCCESS]${NC} $1"; } +print_error() { echo -e "${RED}[ERROR]${NC} $1" >&2; } + +# Test connection before creating filters +if [[ "$SILENT" != "true" ]]; then + print_info "Testing connectivity..." + if curl_with_retry "$P_URL" "GET" "" "text/html" 1 5; then + print_info "Basic connectivity OK" + else + print_error "Cannot connect to $P_URL - check if server is running" + exit 1 + fi +fi + +# Create comprehensive SQL filters (10 filters) +create_sql_filters() { + print_info "Creating 10 SQL filters..." + + sql_filters=( + "error_logs|Monitor all ERROR and FATAL severity events|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') ORDER BY time_unix_nano DESC LIMIT 100" + "high_response_time|Identify requests with extended response times|SELECT \"service.name\", \"url.path\", body FROM $P_STREAM WHERE body LIKE '%duration%' ORDER BY time_unix_nano DESC LIMIT 50" + "service_health_summary|Service health metrics by severity|SELECT \"service.name\", severity_text, COUNT(*) as count FROM $P_STREAM GROUP BY \"service.name\", severity_text ORDER BY count DESC" + "api_endpoint_performance|API endpoint request patterns|SELECT \"url.path\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"url.path\", \"service.name\" ORDER BY request_count DESC LIMIT 20" + "authentication_failures|Monitor auth-related warnings and errors|SELECT * FROM $P_STREAM WHERE \"url.path\" LIKE '%login%' AND severity_text IN ('WARN', 'ERROR') ORDER BY time_unix_nano DESC LIMIT 100" + "upstream_cluster_analysis|Request distribution across clusters|SELECT \"upstream.cluster\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"upstream.cluster\", \"service.name\" ORDER BY request_count DESC" + "trace_analysis|Multi-span traces for distributed tracking|SELECT trace_id, COUNT(*) as span_count, \"service.name\" FROM $P_STREAM GROUP BY trace_id, \"service.name\" HAVING span_count > 1 ORDER BY span_count DESC LIMIT 10" + "user_agent_distribution|Client types and user agent patterns|SELECT \"user_agent.original\", COUNT(*) as usage_count FROM $P_STREAM GROUP BY \"user_agent.original\" ORDER BY usage_count DESC LIMIT 15" + "source_address_analysis|Request distribution by source IP|SELECT \"source.address\", COUNT(*) as request_count, COUNT(DISTINCT \"service.name\") as services_accessed FROM $P_STREAM GROUP BY \"source.address\" ORDER BY request_count DESC LIMIT 20" + "severity_timeline|Severity trends over time|SELECT \"severity_text\", COUNT(*) as count, \"service.name\" FROM $P_STREAM GROUP BY \"severity_text\", \"service.name\" ORDER BY count DESC" + ) + + sql_success_count=0 + filter_number=1 + + for filter_config in "${sql_filters[@]}"; do + IFS='|' read -r name description query <<< "$filter_config" + + [[ "$SILENT" != "true" ]] && echo "Creating SQL filter $filter_number/10: $name" + + # Escape quotes for JSON + escaped_query=$(echo "$query" | sed 's/"/\\"/g') + escaped_desc=$(echo "$description" | sed 's/"/\\"/g') + + json="{\"stream_name\":\"sql\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"sql\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null}" + + # Add timeout and better error handling + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then + [[ "$SILENT" != "true" ]] && echo "✓ SQL Filter: $name" + sql_success_count=$((sql_success_count + 1)) + else + [[ "$SILENT" != "true" ]] && echo "✗ Failed after retries: $name" + fi + + # Small delay between requests to avoid overwhelming server + sleep 0.5 + filter_number=$((filter_number + 1)) + done + + [[ "$SILENT" != "true" ]] && print_success "Created $sql_success_count/10 SQL filters" + + # Wait a bit before creating saved filters + [[ "$SILENT" != "true" ]] && echo "Waiting 3 seconds before creating saved filters..." + sleep 3 +} + +# Create comprehensive saved filters (10 filters) +create_saved_filters() { + print_info "Creating 10 saved filters..." + + saved_filters=( + "service_errors|Monitor service errors and failures|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path|service.name" + "auth_security_events|Authentication and authorization monitoring|SELECT * FROM $P_STREAM WHERE url.path LIKE '%login%' AND severity_text IN ('WARN', 'ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,source.address,user_agent.original|severity_text" + "high_latency_requests|High response time requests|SELECT * FROM $P_STREAM WHERE body LIKE '%duration%' LIMIT 500|Ingestion Time,Data,service.name,url.path,upstream.cluster,body|service.name" + "upstream_cluster_health|Upstream cluster performance|SELECT * FROM $P_STREAM WHERE upstream.cluster IS NOT NULL LIMIT 500|Ingestion Time,Data,upstream.cluster,service.name,severity_text,destination.address|upstream.cluster" + "api_endpoint_monitoring|API endpoint usage patterns|SELECT * FROM $P_STREAM WHERE url.path IS NOT NULL LIMIT 500|Ingestion Time,Data,url.path,service.name,severity_text,source.address|url.path" + "trace_correlation_view|Correlated traces for distributed tracking|SELECT * FROM $P_STREAM WHERE trace_id IS NOT NULL AND span_id IS NOT NULL LIMIT 500|Ingestion Time,Data,trace_id,span_id,service.name,url.path|trace_id" + "user_agent_analysis|Client types and patterns|SELECT * FROM $P_STREAM WHERE user_agent.original IS NOT NULL LIMIT 500|Ingestion Time,Data,user_agent.original,source.address,url.path,service.name|user_agent.original" + "network_monitoring|Network traffic and server interactions|SELECT * FROM $P_STREAM WHERE source.address IS NOT NULL LIMIT 500|Ingestion Time,Data,source.address,destination.address,service.name,severity_text,url.path|source.address" + "service_overview|Comprehensive service activity view|SELECT * FROM $P_STREAM LIMIT 500|Ingestion Time,Data,service.name,url.path,source.address,destination.address,upstream.cluster|service.name" + "recent_activity|Most recent system activity|SELECT * FROM $P_STREAM ORDER BY time_unix_nano DESC LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path,source.address|severity_text" + ) + + saved_success_count=0 + filter_number=1 + + for filter_config in "${saved_filters[@]}"; do + IFS='|' read -r name description query visible_columns group_by <<< "$filter_config" + + [[ "$SILENT" != "true" ]] && echo "Creating saved filter $filter_number/10: $name" + + # Escape quotes + escaped_query=$(echo "$query" | sed 's/"/\\"/g') + escaped_desc=$(echo "$description" | sed 's/"/\\"/g') + + # Convert visible columns to JSON array + IFS=',' read -ra col_array <<< "$visible_columns" + visible_cols_json="" + for i in "${!col_array[@]}"; do + [[ $i -gt 0 ]] && visible_cols_json+="," + visible_cols_json+="\"${col_array[$i]}\"" + done + + json="{\"stream_name\":\"$P_STREAM\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"filter\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null,\"tableConfig\":{\"visibleColumns\":[$visible_cols_json],\"pinnedColumns\":[]},\"groupBy\":\"$group_by\"}" + + # Add timeout and better error handling for saved filters + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then + [[ "$SILENT" != "true" ]] && echo "✓ Saved Filter: $name" + saved_success_count=$((saved_success_count + 1)) + else + [[ "$SILENT" != "true" ]] && echo "✗ Failed after retries: $name" + fi + + # Small delay between requests + sleep 0.5 + filter_number=$((filter_number + 1)) + done + + [[ "$SILENT" != "true" ]] && print_success "Created $saved_success_count/10 saved filters" +} + +# Create all filters +create_sql_filters +create_saved_filters + +print_success "Filter creation completed successfully!" + +# Always exit with success if we get here +exit 0 \ No newline at end of file diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh new file mode 100755 index 000000000..5fa7e8deb --- /dev/null +++ b/resources/ingest_demo_data.sh @@ -0,0 +1,377 @@ +#!/usr/bin/env bash + +# Configuration with validation +P_URL=${P_URL:-"http://localhost:8000"} +P_USERNAME=${P_USERNAME:-"admin"} +P_PASSWORD=${P_PASSWORD:-"admin"} +P_STREAM=${P_STREAM:-"demodata"} +TARGET_RECORDS=10000 +BATCH_SIZE=1000 # Back to 1000 for maximum throughput + +# Silent mode handling +SILENT=${SILENT:-false} +for arg in "$@"; do + case $arg in + --silent) + SILENT=true + shift + ;; + -h|--help) + echo "Usage: $0 [--silent]" + echo " --silent Run in silent mode" + exit 0 + ;; + esac +done + +# Only show config if not silent +if [[ "$SILENT" != "true" ]]; then + echo "Configuration:" + echo " URL: $P_URL" + echo " Username: $P_USERNAME" + echo " Stream: $P_STREAM" + echo " Target Records: $TARGET_RECORDS" + echo " Batch Size: $BATCH_SIZE" + echo +fi + +# Performance tracking +START_TIME=$(date +%s) +RECORDS_SENT=0 + +# Common curl function with retry logic +curl_with_retry() { + local url="$1" + local method="$2" + local data="$3" + local content_type="${4:-application/json}" + local max_retries="${5:-3}" + local base_timeout="${6:-15}" + local retry_count=0 + + # Set timeout based on retry attempt: 10s, 20s, 30s + local max_time=$((10 + (retry_count * 10))) + local connect_timeout=5 + + # Create temp file if data is provided + if [[ -n "$data" ]]; then + temp_file=$(mktemp) + if [[ $? -ne 0 ]]; then + print_error "Failed to create temporary file" + return 1 + fi + echo "$data" > "$temp_file" + fi + + while [[ $retry_count -lt $max_retries ]]; do + # Current timeout: 10s, 20s, 30s for attempts 1, 2, 3 + max_time=$((10 + (retry_count * 10))) + + local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" + + # Add headers + curl_cmd+=" -H \"Content-Type: $content_type\"" + curl_cmd+=" -H \"$AUTH_HEADER\"" + + # Add stream header for ingestion requests + if [[ "$url" == *"/ingest"* ]]; then + curl_cmd+=" -H \"X-P-STREAM: $P_STREAM\"" + fi + + # Add method and data + if [[ "$method" == "POST" ]]; then + curl_cmd+=" -X POST" + if [[ -n "$temp_file" ]]; then + curl_cmd+=" --data-binary \"@$temp_file\"" + elif [[ -n "$data" ]]; then + curl_cmd+=" -d \"$data\"" + fi + fi + + # Add URL + curl_cmd+=" \"$url\"" + + # Execute curl + local response + response=$(eval "$curl_cmd" 2>&1) + local curl_exit_code=$? + + # Check curl exit code + if [[ $curl_exit_code -eq 0 ]]; then + # Success - extract status code and return + local status_code + if [[ -n "$response" ]]; then + status_code=$(echo "$response" | tail -n1) + local response_body=$(echo "$response" | sed '$d') + + # Clean up temp file + [[ -n "$temp_file" ]] && rm -f "$temp_file" + + if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then + return 0 + else + print_error "HTTP $status_code: $response_body" + return 1 + fi + else + print_error "No response from server" + return 1 + fi + elif [[ $curl_exit_code -eq 28 ]]; then + # Timeout - retry immediately with next timeout level + retry_count=$((retry_count + 1)) + + if [[ "$SILENT" != "true" && -n "$data" ]]; then + echo "Timeout (${#data} chars) - retry $retry_count with $((10 + (retry_count * 10)))s timeout" + elif [[ "$SILENT" != "true" ]]; then + echo "Timeout - retry $retry_count with $((10 + (retry_count * 10)))s timeout" + fi + + # Brief pause before retry + sleep 1 + else + # Other error - break and report + break + fi + done + + # Clean up temp file on failure + [[ -n "$temp_file" ]] && rm -f "$temp_file" + + # Final error reporting + print_error "curl failed with exit code $curl_exit_code after $retry_count retries" + if [[ -n "$data" ]]; then + print_error "Data size: ${#data} characters, Final timeout: ${max_time}s" + fi + [[ "$SILENT" != "true" ]] && print_error "Response: $response" + + return 1 +} + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +BLUE='\033[0;34m' +NC='\033[0m' + +print_info() { [[ "$SILENT" != "true" ]] && echo -e "${BLUE}[INFO]${NC} $1"; } +print_success() { [[ "$SILENT" != "true" ]] && echo -e "${GREEN}[SUCCESS]${NC} $1"; } +print_error() { echo -e "${RED}[ERROR]${NC} $1" >&2; } + +# Pre-compute ALL static data once (no random generation in loop) +[[ "$SILENT" != "true" ]] && echo "Initializing static data..." +TRACE_IDS=() +SPAN_IDS=() +IP_ADDRESSES=() +TIMESTAMPS=() +UNIX_NANOS=() + +# Generate 100 of each for cycling through +for i in {1..100}; do + TRACE_IDS+=("$(printf '%032x' $((RANDOM * RANDOM)))") + SPAN_IDS+=("$(printf '%016x' $((RANDOM * RANDOM)))") + IP_ADDRESSES+=("192.168.$((RANDOM % 256)).$((RANDOM % 256))") + TIMESTAMPS+=("$(date -u +%Y-%m-%dT%H:%M:%S.%03dZ -d "+$((RANDOM % 3600)) seconds")") + UNIX_NANOS+=("$(date +%s)$(printf '%09d' $((RANDOM % 1000000000)))") +done + +# Static arrays for ultra-fast selection +METHODS=("GET" "GET" "GET" "GET" "POST" "PUT") +STATUS_CODES=(200 200 200 201 400 500) +SERVICES=("frontend" "api" "auth" "cart" "payment") +ENDPOINTS=("/products" "/cart" "/login" "/checkout" "/search") +USER_AGENTS=("curl/7.88.1" "python-requests/2.32.3" "Mozilla/5.0") +CLUSTERS=("web" "api" "db") + +# Pre-compute auth header +AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" + +[[ "$SILENT" != "true" ]] && echo "Static data initialized. Starting generation..." + +# Ultra-fast log generation using array cycling +generate_batch() { + batch_size=$1 + + # Validate input + if [[ -z "$batch_size" || "$batch_size" -eq 0 ]]; then + print_error "generate_batch called with invalid batch_size: $batch_size" + return 1 + fi + + # Check if arrays are initialized + if [[ ${#TRACE_IDS[@]} -eq 0 ]]; then + print_error "Static data arrays not initialized" + return 1 + fi + + batch_data='[' + + for ((i=0; i BATCH_SIZE ? BATCH_SIZE : remaining)) + + # Progress tracking (only if not silent) + if [[ "$SILENT" != "true" ]]; then + progress=$((RECORDS_SENT * 100 / TARGET_RECORDS)) + elapsed=$(($(date +%s) - START_TIME)) + rate=$((RECORDS_SENT / (elapsed == 0 ? 1 : elapsed))) + echo "Progress: ${progress}% (${RECORDS_SENT}/${TARGET_RECORDS}) | Rate: ${rate} records/sec | Elapsed: ${elapsed}s" + fi + + # Generate and send batch with error checking + if [[ "$SILENT" != "true" ]]; then + echo "Generating batch of $current_batch_size records..." + fi + + batch_data=$(generate_batch $current_batch_size) + + if [[ -z "$batch_data" ]]; then + print_error "Failed to generate batch data" + exit 1 + fi + + if [[ "$SILENT" != "true" ]]; then + echo "Sending batch (${#batch_data} characters)..." + fi + + if send_batch "$batch_data"; then + RECORDS_SENT=$((RECORDS_SENT + current_batch_size)) + [[ "$SILENT" != "true" ]] && echo "✓ Batch sent successfully" + else + print_error "Failed to send batch at record $RECORDS_SENT" + exit 1 + fi + + # Small delay to prevent overwhelming the server + sleep 0.1 +done + +# Final statistics +TOTAL_TIME=$(($(date +%s) - START_TIME)) +FINAL_RATE=$((TARGET_RECORDS / (TOTAL_TIME == 0 ? 1 : TOTAL_TIME))) + +if [[ "$SILENT" != "true" ]]; then + echo -e "\n" + print_success "Completed $TARGET_RECORDS records in ${TOTAL_TIME} seconds" + print_success "Average rate: ${FINAL_RATE} records/second" + print_success "Total batches: $((TARGET_RECORDS / BATCH_SIZE))" +fi + +# Performance tips shown (only if not silent) +if [[ "$SILENT" != "true" ]]; then + echo + print_info "Performance optimizations used:" + echo " • Pre-computed all random data (no runtime generation)" + echo " • Large batch sizes (fewer HTTP requests)" + echo " • Array cycling instead of random selection" + echo " • Minimal JSON formatting" + echo " • curl connection keep-alive" + echo " • Single-process execution (no worker overhead)" +fi + +# Always exit with success if we get here +exit 0 \ No newline at end of file diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 6eab244bc..39cc4ded9 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -156,6 +156,62 @@ pub async fn sync_streams_with_ingestors( ).await } +// forward the demo data request to one of the live ingestor +pub async fn get_demo_data_from_ingestor(action: &str) -> Result<(), PostError> { + let ingestor_infos: Vec = + get_node_info(NodeType::Ingestor).await.map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err) + })?; + + let mut live_ingestors: Vec = Vec::new(); + for ingestor in ingestor_infos { + if utils::check_liveness(&ingestor.domain_name).await { + live_ingestors.push(ingestor); + break; + } + } + + if live_ingestors.is_empty() { + return Err(PostError::Invalid(anyhow::anyhow!( + "No live ingestors found" + ))); + } + + // Pick the first live ingestor + let ingestor = &live_ingestors[0]; + + let url = format!( + "{}{}/demodata?action={action}", + ingestor.domain_name, + base_path_without_preceding_slash() + ); + + let res = INTRA_CLUSTER_CLIENT + .get(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + PostError::Invalid(err.into()) + })?; + + if !res.status().is_success() { + return Err(PostError::Invalid(anyhow::anyhow!( + "failed to forward request to ingestor: {}\nResponse status: {}", + ingestor.domain_name, + res.status() + ))); + } + + Ok(()) +} + // forward the role update request to all ingestors to keep them in sync pub async fn sync_users_with_roles_with_ingestors( username: &str, @@ -919,7 +975,7 @@ where for result in results { match result { Ok(Some(node_metrics)) => metrics.push(node_metrics), - Ok(None) => {} // node was not live or metrics couldn't be fetched + Ok(_) => {} // node was not live or metrics couldn't be fetched Err(err) => return Err(err), } } diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs new file mode 100644 index 000000000..29242a52b --- /dev/null +++ b/src/handlers/http/demo_data.rs @@ -0,0 +1,136 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + handlers::http::{cluster::get_demo_data_from_ingestor, ingest::PostError}, + option::Mode, + parseable::PARSEABLE, +}; +use actix_web::{web, HttpRequest, HttpResponse}; +use std::{collections::HashMap, env, os::unix::fs::PermissionsExt, process::Command}; + +pub async fn get_demo_data(req: HttpRequest) -> Result { + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| PostError::InvalidQueryParameter)?; + + if query_map.is_empty() { + return Err(PostError::MissingQueryParameter); + } + + let action = query_map + .get("action") + .cloned() + .ok_or(PostError::MissingQueryParameter)?; + + let url = &PARSEABLE.options.address; + let username = &PARSEABLE.options.username; + let password = &PARSEABLE.options.password; + let scheme = PARSEABLE.options.get_scheme(); + let url = format!("{scheme}://{url}"); + + match action.as_str() { + "ingest" => match PARSEABLE.options.mode { + Mode::Ingest | Mode::All => { + let script_path = get_script_path(&action)?; + + // Fire the script execution asynchronously + tokio::spawn(async move { + if let Err(e) = + execute_demo_script(&script_path, &url, username, password).await + { + tracing::warn!("Failed to execute demo script: {}", e); + } + }); + + Ok(HttpResponse::Accepted().finish()) + } + Mode::Query | Mode::Prism => { + // Forward the request to ingestor asynchronously + tokio::spawn(async move { + if let Err(e) = get_demo_data_from_ingestor(&action).await { + tracing::warn!("Failed to forward request to ingestor: {}", e); + } + }); + + Ok(HttpResponse::Accepted().finish()) + } + _ => Err(PostError::Invalid(anyhow::anyhow!( + "Demo data is not available in this mode" + ))), + }, + "filters" => { + let script_path = get_script_path(&action)?; + + // Fire the script execution asynchronously + tokio::spawn(async move { + if let Err(e) = execute_demo_script(&script_path, &url, username, password).await { + tracing::warn!("Failed to execute demo script: {}", e); + } + }); + + Ok(HttpResponse::Accepted().finish()) + } + _ => Err(PostError::InvalidQueryParameter), + } +} + +async fn execute_demo_script( + script_path: &str, + url: &str, + username: &str, + password: &str, +) -> Result<(), anyhow::Error> { + // Ensure the script exists and has correct permissions set during deployment + let metadata = std::fs::metadata(script_path) + .map_err(|e| anyhow::anyhow!("Failed to read script metadata: {}", e))?; + + if metadata.permissions().mode() & 0o111 == 0 { + return Err(anyhow::anyhow!("Script is not executable: {}", script_path)); + } + // Execute the script with environment variables + if let Err(e) = Command::new("bash") + .arg(script_path) + .arg("--silent") + .env("P_URL", url) + .env("P_USERNAME", username) + .env("P_PASSWORD", password) + .output() + { + return Err(anyhow::anyhow!("Failed to execute script: {}", e)); + } + + Ok(()) +} + +fn get_script_path(action: &str) -> Result { + // Get the current working directory (where cargo run is executed from) + let current_dir = env::current_dir() + .map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?; + // Construct the path to the script based on the action + let path = match action { + "ingest" => "resources/ingest_demo_data.sh", + "filters" => "resources/filters_demo_data.sh", + _ => return Err(anyhow::anyhow!("Invalid action: {}", action)), + }; + let full_path = current_dir.join(path); + if full_path.exists() { + return Ok(full_path.to_string_lossy().to_string()); + } + + Err(anyhow::anyhow!("Script not found")) +} diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 34b832f6a..78a0f4525 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -493,6 +493,10 @@ pub enum PostError { IncorrectLogFormat(String), #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] FieldsCountLimitExceeded(String, usize, usize), + #[error("Invalid query parameter")] + InvalidQueryParameter, + #[error("Missing query parameter")] + MissingQueryParameter, } impl actix_web::ResponseError for PostError { @@ -522,6 +526,8 @@ impl actix_web::ResponseError for PostError { PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, + PostError::InvalidQueryParameter => StatusCode::BAD_REQUEST, + PostError::MissingQueryParameter => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 1d6c78246..f56b156e5 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -34,6 +34,7 @@ pub mod alerts; mod audit; pub mod cluster; pub mod correlation; +pub mod demo_data; pub mod health_check; pub mod ingest; mod kinesis; diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 1ecc5dff8..a0bf9ed99 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -78,7 +78,8 @@ impl ParseableServer for IngestServer { .service(Self::get_user_webscope()) .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) - .service(Server::get_readiness_factory()), + .service(Server::get_readiness_factory()) + .service(Server::get_demo_data_webscope()), ) .service(Server::get_ingest_otel_factory().wrap(from_fn( resource_check::check_resource_utilization_middleware, diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 335e29896..78c5d5383 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -75,7 +75,8 @@ impl ParseableServer for QueryServer { .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) .service(Server::get_targets_webscope()) - .service(Self::get_cluster_web_scope()), + .service(Self::get_cluster_web_scope()) + .service(Server::get_demo_data_webscope()), ) .service( web::scope(&prism_base_path()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 615af1d8e..6de0c5413 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -23,6 +23,7 @@ use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::alerts; use crate::handlers::http::base_path; +use crate::handlers::http::demo_data::get_demo_data; use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; @@ -97,7 +98,8 @@ impl ParseableServer for Server { ))) .service(Self::get_alerts_webscope()) .service(Self::get_targets_webscope()) - .service(Self::get_metrics_webscope()), + .service(Self::get_metrics_webscope()) + .service(Self::get_demo_data_webscope()), ) .service( web::scope(&prism_base_path()) @@ -201,6 +203,10 @@ impl Server { ) } + pub fn get_demo_data_webscope() -> Scope { + web::scope("/demodata").service(web::resource("").route(web::get().to(get_demo_data))) + } + pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)), From ca13841a08549ccc44c081009facbf10c17f045e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 10 Jul 2025 23:33:40 -0700 Subject: [PATCH 02/13] update script and execution design --- Cargo.lock | 33 ++++-- Cargo.toml | 1 + resources/filters_demo_data.sh | 115 ++------------------ resources/ingest_demo_data.sh | 189 ++++----------------------------- src/handlers/http/demo_data.rs | 93 +++++++++------- 5 files changed, 109 insertions(+), 322 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5cc606f2a..79faa52de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,7 +1310,7 @@ dependencies = [ "crossterm_winapi", "mio", "parking_lot", - "rustix", + "rustix 0.38.44", "signal-hook", "signal-hook-mio", "winapi", @@ -2928,6 +2928,12 @@ version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.7.4" @@ -3515,6 +3521,7 @@ dependencies = [ "static-files", "sysinfo", "temp-dir", + "tempfile", "thiserror 2.0.11", "tokio", "tokio-stream", @@ -3730,7 +3737,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix", + "rustix 0.38.44", ] [[package]] @@ -4251,7 +4258,20 @@ dependencies = [ "bitflags 2.8.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + +[[package]] +name = "rustix" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +dependencies = [ + "bitflags 2.8.0", + "errno", + "libc", + "linux-raw-sys 0.9.4", "windows-sys 0.59.0", ] @@ -4848,15 +4868,14 @@ checksum = "bc1ee6eef34f12f765cb94725905c6312b6610ab2b0940889cfe58dae7bc3c72" [[package]] name = "tempfile" -version = "3.16.0" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38c246215d7d24f48ae091a2902398798e05d978b24315d6efbc00ede9a8bb91" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ - "cfg-if", "fastrand 2.3.0", "getrandom 0.3.1", "once_cell", - "rustix", + "rustix 1.0.7", "windows-sys 0.59.0", ] diff --git a/Cargo.toml b/Cargo.toml index aa2d2de11..191640b06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ thiserror = "2.0" ulid = { version = "1.0", features = ["serde"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } futures-core = "0.3.31" +tempfile = "3.20.0" [build-dependencies] cargo_toml = "0.21" diff --git a/resources/filters_demo_data.sh b/resources/filters_demo_data.sh index 594703d82..98e1ba9f1 100755 --- a/resources/filters_demo_data.sh +++ b/resources/filters_demo_data.sh @@ -1,36 +1,11 @@ #!/usr/bin/env bash -# Configuration with validation +# Configuration P_URL=${P_URL:-"http://localhost:8000"} P_USERNAME=${P_USERNAME:-"admin"} P_PASSWORD=${P_PASSWORD:-"admin"} P_STREAM=${P_STREAM:-"demodata"} -# Silent mode handling -SILENT=${SILENT:-false} -for arg in "$@"; do - case $arg in - --silent) - SILENT=true - shift - ;; - -h|--help) - echo "Usage: $0 [--silent]" - echo " --silent Run in silent mode" - exit 0 - ;; - esac -done - -# Only show config if not silent -if [[ "$SILENT" != "true" ]]; then - echo "Configuration:" - echo " URL: $P_URL" - echo " Username: $P_USERNAME" - echo " Stream: $P_STREAM" - echo -fi - # Pre-compute auth header AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" @@ -41,26 +16,20 @@ curl_with_retry() { local data="$3" local content_type="${4:-application/json}" local max_retries="${5:-3}" - local base_timeout="${6:-15}" local retry_count=0 - # Set timeout based on retry attempt: 10s, 20s, 30s - local max_time=$((10 + (retry_count * 10))) - local connect_timeout=5 - # Create temp file if data is provided if [[ -n "$data" ]]; then temp_file=$(mktemp) if [[ $? -ne 0 ]]; then - print_error "Failed to create temporary file" return 1 fi echo "$data" > "$temp_file" fi while [[ $retry_count -lt $max_retries ]]; do - # Current timeout: 10s, 20s, 30s for attempts 1, 2, 3 - max_time=$((10 + (retry_count * 10))) + local max_time=$((10 + (retry_count * 10))) + local connect_timeout=5 local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" @@ -93,11 +62,9 @@ curl_with_retry() { # Check curl exit code if [[ $curl_exit_code -eq 0 ]]; then - # Success - extract status code and return local status_code if [[ -n "$response" ]]; then status_code=$(echo "$response" | tail -n1) - local response_body=$(echo "$response" | sed '$d') # Clean up temp file [[ -n "$temp_file" ]] && rm -f "$temp_file" @@ -105,27 +72,16 @@ curl_with_retry() { if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then return 0 else - print_error "HTTP $status_code: $response_body" return 1 fi else - print_error "No response from server" return 1 fi elif [[ $curl_exit_code -eq 28 ]]; then - # Timeout - retry immediately with next timeout level + # Timeout - retry retry_count=$((retry_count + 1)) - - if [[ "$SILENT" != "true" && -n "$data" ]]; then - echo "Timeout (${#data} chars) - retry $retry_count with $((10 + (retry_count * 10)))s timeout" - elif [[ "$SILENT" != "true" ]]; then - echo "Timeout - retry $retry_count with $((10 + (retry_count * 10)))s timeout" - fi - - # Brief pause before retry sleep 1 else - # Other error - break and report break fi done @@ -133,41 +89,11 @@ curl_with_retry() { # Clean up temp file on failure [[ -n "$temp_file" ]] && rm -f "$temp_file" - # Final error reporting - print_error "curl failed with exit code $curl_exit_code after $retry_count retries" - if [[ -n "$data" ]]; then - print_error "Data size: ${#data} characters, Final timeout: ${max_time}s" - fi - [[ "$SILENT" != "true" ]] && print_error "Response: $response" - return 1 } -# Colors -RED='\033[0;31m' -GREEN='\033[0;32m' -BLUE='\033[0;34m' -NC='\033[0m' - -print_info() { [[ "$SILENT" != "true" ]] && echo -e "${BLUE}[INFO]${NC} $1"; } -print_success() { [[ "$SILENT" != "true" ]] && echo -e "${GREEN}[SUCCESS]${NC} $1"; } -print_error() { echo -e "${RED}[ERROR]${NC} $1" >&2; } - -# Test connection before creating filters -if [[ "$SILENT" != "true" ]]; then - print_info "Testing connectivity..." - if curl_with_retry "$P_URL" "GET" "" "text/html" 1 5; then - print_info "Basic connectivity OK" - else - print_error "Cannot connect to $P_URL - check if server is running" - exit 1 - fi -fi - -# Create comprehensive SQL filters (10 filters) +# Create SQL filters create_sql_filters() { - print_info "Creating 10 SQL filters..." - sql_filters=( "error_logs|Monitor all ERROR and FATAL severity events|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') ORDER BY time_unix_nano DESC LIMIT 100" "high_response_time|Identify requests with extended response times|SELECT \"service.name\", \"url.path\", body FROM $P_STREAM WHERE body LIKE '%duration%' ORDER BY time_unix_nano DESC LIMIT 50" @@ -182,43 +108,28 @@ create_sql_filters() { ) sql_success_count=0 - filter_number=1 for filter_config in "${sql_filters[@]}"; do IFS='|' read -r name description query <<< "$filter_config" - [[ "$SILENT" != "true" ]] && echo "Creating SQL filter $filter_number/10: $name" - # Escape quotes for JSON escaped_query=$(echo "$query" | sed 's/"/\\"/g') escaped_desc=$(echo "$description" | sed 's/"/\\"/g') json="{\"stream_name\":\"sql\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"sql\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null}" - # Add timeout and better error handling if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then - [[ "$SILENT" != "true" ]] && echo "✓ SQL Filter: $name" sql_success_count=$((sql_success_count + 1)) - else - [[ "$SILENT" != "true" ]] && echo "✗ Failed after retries: $name" fi - # Small delay between requests to avoid overwhelming server sleep 0.5 - filter_number=$((filter_number + 1)) done - [[ "$SILENT" != "true" ]] && print_success "Created $sql_success_count/10 SQL filters" - - # Wait a bit before creating saved filters - [[ "$SILENT" != "true" ]] && echo "Waiting 3 seconds before creating saved filters..." sleep 3 } -# Create comprehensive saved filters (10 filters) +# Create saved filters create_saved_filters() { - print_info "Creating 10 saved filters..." - saved_filters=( "service_errors|Monitor service errors and failures|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path|service.name" "auth_security_events|Authentication and authorization monitoring|SELECT * FROM $P_STREAM WHERE url.path LIKE '%login%' AND severity_text IN ('WARN', 'ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,source.address,user_agent.original|severity_text" @@ -233,13 +144,10 @@ create_saved_filters() { ) saved_success_count=0 - filter_number=1 for filter_config in "${saved_filters[@]}"; do IFS='|' read -r name description query visible_columns group_by <<< "$filter_config" - [[ "$SILENT" != "true" ]] && echo "Creating saved filter $filter_number/10: $name" - # Escape quotes escaped_query=$(echo "$query" | sed 's/"/\\"/g') escaped_desc=$(echo "$description" | sed 's/"/\\"/g') @@ -254,27 +162,16 @@ create_saved_filters() { json="{\"stream_name\":\"$P_STREAM\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"filter\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null,\"tableConfig\":{\"visibleColumns\":[$visible_cols_json],\"pinnedColumns\":[]},\"groupBy\":\"$group_by\"}" - # Add timeout and better error handling for saved filters if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then - [[ "$SILENT" != "true" ]] && echo "✓ Saved Filter: $name" saved_success_count=$((saved_success_count + 1)) - else - [[ "$SILENT" != "true" ]] && echo "✗ Failed after retries: $name" fi - # Small delay between requests sleep 0.5 - filter_number=$((filter_number + 1)) done - - [[ "$SILENT" != "true" ]] && print_success "Created $saved_success_count/10 saved filters" } # Create all filters create_sql_filters create_saved_filters -print_success "Filter creation completed successfully!" - -# Always exit with success if we get here exit 0 \ No newline at end of file diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 5fa7e8deb..084754769 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -1,43 +1,15 @@ #!/usr/bin/env bash -# Configuration with validation +# Configuration P_URL=${P_URL:-"http://localhost:8000"} P_USERNAME=${P_USERNAME:-"admin"} P_PASSWORD=${P_PASSWORD:-"admin"} P_STREAM=${P_STREAM:-"demodata"} TARGET_RECORDS=10000 -BATCH_SIZE=1000 # Back to 1000 for maximum throughput +BATCH_SIZE=1000 -# Silent mode handling -SILENT=${SILENT:-false} -for arg in "$@"; do - case $arg in - --silent) - SILENT=true - shift - ;; - -h|--help) - echo "Usage: $0 [--silent]" - echo " --silent Run in silent mode" - exit 0 - ;; - esac -done - -# Only show config if not silent -if [[ "$SILENT" != "true" ]]; then - echo "Configuration:" - echo " URL: $P_URL" - echo " Username: $P_USERNAME" - echo " Stream: $P_STREAM" - echo " Target Records: $TARGET_RECORDS" - echo " Batch Size: $BATCH_SIZE" - echo -fi - -# Performance tracking -START_TIME=$(date +%s) -RECORDS_SENT=0 +# Pre-compute auth header +AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" # Common curl function with retry logic curl_with_retry() { @@ -46,26 +18,20 @@ curl_with_retry() { local data="$3" local content_type="${4:-application/json}" local max_retries="${5:-3}" - local base_timeout="${6:-15}" local retry_count=0 - # Set timeout based on retry attempt: 10s, 20s, 30s - local max_time=$((10 + (retry_count * 10))) - local connect_timeout=5 - # Create temp file if data is provided if [[ -n "$data" ]]; then temp_file=$(mktemp) if [[ $? -ne 0 ]]; then - print_error "Failed to create temporary file" return 1 fi echo "$data" > "$temp_file" fi while [[ $retry_count -lt $max_retries ]]; do - # Current timeout: 10s, 20s, 30s for attempts 1, 2, 3 - max_time=$((10 + (retry_count * 10))) + local max_time=$((10 + (retry_count * 10))) + local connect_timeout=5 local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" @@ -98,11 +64,9 @@ curl_with_retry() { # Check curl exit code if [[ $curl_exit_code -eq 0 ]]; then - # Success - extract status code and return local status_code if [[ -n "$response" ]]; then status_code=$(echo "$response" | tail -n1) - local response_body=$(echo "$response" | sed '$d') # Clean up temp file [[ -n "$temp_file" ]] && rm -f "$temp_file" @@ -110,27 +74,16 @@ curl_with_retry() { if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then return 0 else - print_error "HTTP $status_code: $response_body" return 1 fi else - print_error "No response from server" return 1 fi elif [[ $curl_exit_code -eq 28 ]]; then - # Timeout - retry immediately with next timeout level + # Timeout - retry retry_count=$((retry_count + 1)) - - if [[ "$SILENT" != "true" && -n "$data" ]]; then - echo "Timeout (${#data} chars) - retry $retry_count with $((10 + (retry_count * 10)))s timeout" - elif [[ "$SILENT" != "true" ]]; then - echo "Timeout - retry $retry_count with $((10 + (retry_count * 10)))s timeout" - fi - - # Brief pause before retry sleep 1 else - # Other error - break and report break fi done @@ -138,28 +91,10 @@ curl_with_retry() { # Clean up temp file on failure [[ -n "$temp_file" ]] && rm -f "$temp_file" - # Final error reporting - print_error "curl failed with exit code $curl_exit_code after $retry_count retries" - if [[ -n "$data" ]]; then - print_error "Data size: ${#data} characters, Final timeout: ${max_time}s" - fi - [[ "$SILENT" != "true" ]] && print_error "Response: $response" - return 1 } -# Colors -RED='\033[0;31m' -GREEN='\033[0;32m' -BLUE='\033[0;34m' -NC='\033[0m' - -print_info() { [[ "$SILENT" != "true" ]] && echo -e "${BLUE}[INFO]${NC} $1"; } -print_success() { [[ "$SILENT" != "true" ]] && echo -e "${GREEN}[SUCCESS]${NC} $1"; } -print_error() { echo -e "${RED}[ERROR]${NC} $1" >&2; } - -# Pre-compute ALL static data once (no random generation in loop) -[[ "$SILENT" != "true" ]] && echo "Initializing static data..." +# Pre-compute static data TRACE_IDS=() SPAN_IDS=() IP_ADDRESSES=() @@ -175,7 +110,7 @@ for i in {1..100}; do UNIX_NANOS+=("$(date +%s)$(printf '%09d' $((RANDOM % 1000000000)))") done -# Static arrays for ultra-fast selection +# Static arrays METHODS=("GET" "GET" "GET" "GET" "POST" "PUT") STATUS_CODES=(200 200 200 201 400 500) SERVICES=("frontend" "api" "auth" "cart" "payment") @@ -183,24 +118,15 @@ ENDPOINTS=("/products" "/cart" "/login" "/checkout" "/search") USER_AGENTS=("curl/7.88.1" "python-requests/2.32.3" "Mozilla/5.0") CLUSTERS=("web" "api" "db") -# Pre-compute auth header -AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" - -[[ "$SILENT" != "true" ]] && echo "Static data initialized. Starting generation..." - -# Ultra-fast log generation using array cycling +# Generate batch data generate_batch() { batch_size=$1 - # Validate input if [[ -z "$batch_size" || "$batch_size" -eq 0 ]]; then - print_error "generate_batch called with invalid batch_size: $batch_size" return 1 fi - # Check if arrays are initialized if [[ ${#TRACE_IDS[@]} -eq 0 ]]; then - print_error "Static data arrays not initialized" return 1 fi @@ -216,7 +142,7 @@ generate_batch() { agent_idx=$((i % 3)) cluster_idx=$((i % 3)) - # Direct array access (no random calls in loop) + # Direct array access trace_id=${TRACE_IDS[$idx]} span_id=${SPAN_IDS[$idx]} source_ip=${IP_ADDRESSES[$idx]} @@ -241,112 +167,61 @@ generate_batch() { severity_text="ERROR" fi - # Escape user agent for JSON (simple approach) + # Escape user agent for JSON escaped_user_agent=$(echo "$user_agent" | sed 's/"/\\"/g' | sed "s/'/\\'/g") - # Single-line JSON generation with proper field structure + # Generate JSON record batch_data+="{\"body\":\"[$timestamp] $method $endpoint HTTP/1.1 $status - bytes:$((500 + i % 1000)) duration:$((10 + i % 90))ms\",\"time_unix_nano\":\"$unix_nano\",\"observed_time_unix_nano\":\"$unix_nano\",\"trace_id\":\"$trace_id\",\"span_id\":\"$span_id\",\"flags\":0,\"severity_number\":$severity_num,\"severity_text\":\"$severity_text\",\"service.name\":\"$service\",\"source.address\":\"$source_ip\",\"destination.address\":\"$dest_ip\",\"server.address\":\"$dest_ip\",\"url.path\":\"$endpoint\",\"url.full\":\"http://$service:8080$endpoint\",\"upstream.cluster\":\"$cluster\",\"user_agent.original\":\"$escaped_user_agent\",\"event.name\":\"proxy.access\"}" - # Add comma except for last item [[ $i -lt $((batch_size - 1)) ]] && batch_data+=',' done batch_data+=']' - # Validate JSON structure if [[ ${#batch_data} -lt 10 ]]; then - print_error "Generated batch too small: ${#batch_data} characters" return 1 fi echo "$batch_data" } -# Send batch with minimal overhead +# Send batch send_batch() { local data="$1" - # Validate input if [[ -z "$data" ]]; then - print_error "send_batch called with empty data" return 1 fi - # Use common curl function with retry logic curl_with_retry "$P_URL/api/v1/ingest" "POST" "$data" "application/json" 3 15 } -# Main execution loop - optimized for speed -[[ "$SILENT" != "true" ]] && echo "Starting batch generation and sending..." - -# Test connection and basic functionality first -if [[ "$SILENT" != "true" ]]; then - print_info "Testing basic connectivity..." - if curl_with_retry "$P_URL" "GET" "" "text/html" 1 5; then - print_info "Basic connectivity OK" - else - print_error "Cannot connect to $P_URL - check if server is running" - exit 1 - fi -fi - -# Generate a small test batch first to validate everything works -if [[ "$SILENT" != "true" ]]; then - print_info "Testing batch generation..." - test_batch=$(generate_batch 1) - if [[ -z "$test_batch" || ${#test_batch} -lt 50 ]]; then - print_error "Batch generation failed - generated: ${#test_batch} characters" - exit 1 - fi - print_info "Batch generation OK (${#test_batch} characters)" - - print_info "Testing first ingestion..." - if ! send_batch "$test_batch"; then - print_error "Test ingestion failed - check credentials and stream name" - exit 1 - fi - print_info "Test ingestion successful" - RECORDS_SENT=1 -fi +# Main execution +START_TIME=$(date +%s) +RECORDS_SENT=0 while [[ $RECORDS_SENT -lt $TARGET_RECORDS ]]; do - # Calculate remaining records remaining=$((TARGET_RECORDS - RECORDS_SENT)) current_batch_size=$((remaining > BATCH_SIZE ? BATCH_SIZE : remaining)) - # Progress tracking (only if not silent) - if [[ "$SILENT" != "true" ]]; then - progress=$((RECORDS_SENT * 100 / TARGET_RECORDS)) - elapsed=$(($(date +%s) - START_TIME)) - rate=$((RECORDS_SENT / (elapsed == 0 ? 1 : elapsed))) - echo "Progress: ${progress}% (${RECORDS_SENT}/${TARGET_RECORDS}) | Rate: ${rate} records/sec | Elapsed: ${elapsed}s" - fi - - # Generate and send batch with error checking - if [[ "$SILENT" != "true" ]]; then - echo "Generating batch of $current_batch_size records..." - fi + # Progress tracking + progress=$((RECORDS_SENT * 100 / TARGET_RECORDS)) + elapsed=$(($(date +%s) - START_TIME)) + rate=$((RECORDS_SENT / (elapsed == 0 ? 1 : elapsed))) + # Generate and send batch batch_data=$(generate_batch $current_batch_size) if [[ -z "$batch_data" ]]; then - print_error "Failed to generate batch data" exit 1 fi - if [[ "$SILENT" != "true" ]]; then - echo "Sending batch (${#batch_data} characters)..." - fi - if send_batch "$batch_data"; then RECORDS_SENT=$((RECORDS_SENT + current_batch_size)) - [[ "$SILENT" != "true" ]] && echo "✓ Batch sent successfully" else - print_error "Failed to send batch at record $RECORDS_SENT" exit 1 fi - # Small delay to prevent overwhelming the server sleep 0.1 done @@ -354,24 +229,4 @@ done TOTAL_TIME=$(($(date +%s) - START_TIME)) FINAL_RATE=$((TARGET_RECORDS / (TOTAL_TIME == 0 ? 1 : TOTAL_TIME))) -if [[ "$SILENT" != "true" ]]; then - echo -e "\n" - print_success "Completed $TARGET_RECORDS records in ${TOTAL_TIME} seconds" - print_success "Average rate: ${FINAL_RATE} records/second" - print_success "Total batches: $((TARGET_RECORDS / BATCH_SIZE))" -fi - -# Performance tips shown (only if not silent) -if [[ "$SILENT" != "true" ]]; then - echo - print_info "Performance optimizations used:" - echo " • Pre-computed all random data (no runtime generation)" - echo " • Large batch sizes (fewer HTTP requests)" - echo " • Array cycling instead of random selection" - echo " • Minimal JSON formatting" - echo " • curl connection keep-alive" - echo " • Single-process execution (no worker overhead)" -fi - -# Always exit with success if we get here exit 0 \ No newline at end of file diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs index 29242a52b..d7e90598f 100644 --- a/src/handlers/http/demo_data.rs +++ b/src/handlers/http/demo_data.rs @@ -22,8 +22,14 @@ use crate::{ parseable::PARSEABLE, }; use actix_web::{web, HttpRequest, HttpResponse}; -use std::{collections::HashMap, env, os::unix::fs::PermissionsExt, process::Command}; +use std::{collections::HashMap, fs, process::Command}; +#[cfg(unix)] +use std::os::unix::fs::PermissionsExt; + +// Embed the scripts at compile time +const INGEST_SCRIPT: &str = include_str!("../../../resources/ingest_demo_data.sh"); +const FILTER_SCRIPT: &str = include_str!("../../../resources/filters_demo_data.sh"); pub async fn get_demo_data(req: HttpRequest) -> Result { let query_map = web::Query::>::from_query(req.query_string()) .map_err(|_| PostError::InvalidQueryParameter)?; @@ -46,13 +52,9 @@ pub async fn get_demo_data(req: HttpRequest) -> Result match action.as_str() { "ingest" => match PARSEABLE.options.mode { Mode::Ingest | Mode::All => { - let script_path = get_script_path(&action)?; - // Fire the script execution asynchronously tokio::spawn(async move { - if let Err(e) = - execute_demo_script(&script_path, &url, username, password).await - { + if let Err(e) = execute_demo_script(&action, &url, username, password).await { tracing::warn!("Failed to execute demo script: {}", e); } }); @@ -74,11 +76,9 @@ pub async fn get_demo_data(req: HttpRequest) -> Result ))), }, "filters" => { - let script_path = get_script_path(&action)?; - // Fire the script execution asynchronously tokio::spawn(async move { - if let Err(e) = execute_demo_script(&script_path, &url, username, password).await { + if let Err(e) = execute_demo_script(&action, &url, username, password).await { tracing::warn!("Failed to execute demo script: {}", e); } }); @@ -90,47 +90,62 @@ pub async fn get_demo_data(req: HttpRequest) -> Result } async fn execute_demo_script( - script_path: &str, + action: &str, url: &str, username: &str, password: &str, ) -> Result<(), anyhow::Error> { - // Ensure the script exists and has correct permissions set during deployment - let metadata = std::fs::metadata(script_path) - .map_err(|e| anyhow::anyhow!("Failed to read script metadata: {}", e))?; + // Create a temporary file to write the script + let temp_file = tempfile::NamedTempFile::new() + .map_err(|e| anyhow::anyhow!("Failed to create temporary file: {}", e))?; + + let temp_path = temp_file.path(); + let script_content = match action { + "ingest" => INGEST_SCRIPT, + "filters" => FILTER_SCRIPT, + _ => return Err(anyhow::anyhow!("Unsupported action: {}", action)), + }; + // Write the script content to the temporary file + fs::write(temp_path, script_content) + .map_err(|e| anyhow::anyhow!("Failed to write script to temp file: {}", e))?; - if metadata.permissions().mode() & 0o111 == 0 { - return Err(anyhow::anyhow!("Script is not executable: {}", script_path)); + // Make the temporary file executable (Unix only) + #[cfg(unix)] + { + let mut permissions = fs::metadata(temp_path) + .map_err(|e| anyhow::anyhow!("Failed to read temp file metadata: {}", e))? + .permissions(); + permissions.set_mode(0o755); + fs::set_permissions(temp_path, permissions) + .map_err(|e| anyhow::anyhow!("Failed to set temp file permissions: {}", e))?; } - // Execute the script with environment variables - if let Err(e) = Command::new("bash") - .arg(script_path) - .arg("--silent") + + let output = Command::new("bash") + .arg(temp_path) .env("P_URL", url) .env("P_USERNAME", username) .env("P_PASSWORD", password) + .env("DEMO_ACTION", action) .output() - { - return Err(anyhow::anyhow!("Failed to execute script: {}", e)); + .map_err(|e| { + anyhow::anyhow!( + "Failed to execute script: {}. Make sure bash is available.", + e + ) + })?; + + drop(temp_file); + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + return Err(anyhow::anyhow!( + "Script execution failed. Exit code: {:?}, stdout: {}, stderr: {}", + output.status.code(), + stdout, + stderr + )); } Ok(()) } - -fn get_script_path(action: &str) -> Result { - // Get the current working directory (where cargo run is executed from) - let current_dir = env::current_dir() - .map_err(|e| anyhow::anyhow!("Failed to get current directory: {}", e))?; - // Construct the path to the script based on the action - let path = match action { - "ingest" => "resources/ingest_demo_data.sh", - "filters" => "resources/filters_demo_data.sh", - _ => return Err(anyhow::anyhow!("Invalid action: {}", action)), - }; - let full_path = current_dir.join(path); - if full_path.exists() { - return Ok(full_path.to_string_lossy().to_string()); - } - - Err(anyhow::anyhow!("Script not found")) -} From 298208667b23e1fe84efec2769b5719c31c455d4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 11 Jul 2025 08:17:25 -0700 Subject: [PATCH 03/13] add alerts to demo --- resources/alerts_demo_data.sh | 206 +++++++++++++++++++++++++++++++++ src/handlers/http/demo_data.rs | 5 +- 2 files changed, 210 insertions(+), 1 deletion(-) create mode 100755 resources/alerts_demo_data.sh diff --git a/resources/alerts_demo_data.sh b/resources/alerts_demo_data.sh new file mode 100755 index 000000000..13bad7d2e --- /dev/null +++ b/resources/alerts_demo_data.sh @@ -0,0 +1,206 @@ +#!/usr/bin/env bash + +# Configuration +P_URL=${P_URL:-"http://localhost:8000"} +P_USERNAME=${P_USERNAME:-"admin"} +P_PASSWORD=${P_PASSWORD:-"admin"} +P_STREAM=${P_STREAM:-"demodata"} + +# Pre-compute auth header +AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" + +# Common curl function with retry logic +curl_with_retry() { + local url="$1" + local method="$2" + local data="$3" + local content_type="${4:-application/json}" + local max_retries="${5:-3}" + local retry_count=0 + local data_file="$7" + + # Create temp file if data is provided (either as string or file) + if [[ -n "$data_file" ]]; then + temp_file="$data_file" + elif [[ -n "$data" ]]; then + temp_file=$(mktemp) + if [[ $? -ne 0 ]]; then + return 1 + fi + printf "%s" "$data" > "$temp_file" + fi + + while [[ $retry_count -lt $max_retries ]]; do + local max_time=$((10 + (retry_count * 10))) + local connect_timeout=5 + + local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" + + # Add headers + curl_cmd+=" -H \"Content-Type: $content_type\"" + curl_cmd+=" -H \"$AUTH_HEADER\"" + + # Add method and data + if [[ "$method" == "POST" ]]; then + curl_cmd+=" -X POST" + if [[ -n "$temp_file" ]]; then + curl_cmd+=" --data-binary \"@$temp_file\"" + elif [[ -n "$data" ]]; then + curl_cmd+=" -d \"$data\"" + fi + fi + + # Add URL + curl_cmd+=" \"$url\"" + + # Execute curl + local response + response=$(eval "$curl_cmd" 2>&1) + local curl_exit_code=$? + + # Check curl exit code + if [[ $curl_exit_code -eq 0 ]]; then + local status_code + if [[ -n "$response" ]]; then + status_code=$(echo "$response" | tail -n1) + local response_body=$(echo "$response" | sed '$d') + + # Clean up temp file (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi + + if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then + echo "$response_body" + return 0 + else + return 1 + fi + else + return 1 + fi + elif [[ $curl_exit_code -eq 28 ]]; then + # Timeout - retry + retry_count=$((retry_count + 1)) + sleep 1 + else + break + fi + done + + # Clean up temp file on failure (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi + + return 1 +} + +# Create webhook target +create_target() { + response=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST "$P_URL/api/v1/targets" -d @- << EOF +{"type":"webhook","endpoint":"https://webhook.site/8e1f26bd-2f5b-47a2-9d0b-3b3dabb30710","name":"Test Webhook","auth":{"username":"","password":""},"skipTlsCheck":false,"notificationConfig":{"interval":1,"times":1}} +EOF +) + + curl_exit_code=$? + + if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then + # Extract target ID from response + target_id=$(echo "$response" | grep -o '"id":"[^"]*"' | cut -d'"' -f4) + if [[ -n "$target_id" ]]; then + echo "Target created successfully with ID: $target_id" >&2 + echo "$target_id" + return 0 + else + echo "Failed to extract target ID from response" >&2 + echo "Response: $response" >&2 + return 1 + fi + else + echo "Failed to create target" >&2 + echo "Curl exit code: $curl_exit_code" >&2 + echo "Response: $response" >&2 + return 1 + fi +} + +# Create JSON file to avoid control character issues +create_json_file() { + local filename="$1" + local content="$2" + + # Create temporary file with proper JSON content + temp_json=$(mktemp) + printf "%s" "$content" > "$temp_json" + echo "$temp_json" +} + +# Create alerts +create_alerts() { + local target_id="$1" + + if [[ -z "$target_id" ]]; then + echo "Target ID is required to create alerts" + return 1 + fi + + # Alert 1: Error Count (severity_number = 18) + alert1_template='{"severity":"high","title":"error count","stream":"STREAM_PLACEHOLDER","alertType":"threshold","aggregates":{"aggregateConfig":[{"aggregateFunction":"count","conditions":{"operator":null,"conditionConfig":[{"column":"severity_number","operator":"=","value":"18"}]},"column":"severity_number","operator":">","value":1000}]},"evalConfig":{"rollingWindow":{"evalStart":"5h","evalEnd":"now","evalFrequency":1}},"targets":["TARGET_PLACEHOLDER"]}' + + alert1_json=$(echo "$alert1_template" | sed "s|STREAM_PLACEHOLDER|$P_STREAM|g" | sed "s|TARGET_PLACEHOLDER|$target_id|g") + response1=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST -d "$alert1_json" "$P_URL/api/v1/alerts") + if [[ $? -eq 0 && -n "$response1" ]]; then + echo "Alert 1 created successfully" + else + echo "Failed to create Alert 1" + echo "Response: $response1" + fi + + # Alert 2: 400 Errors + + alert2_template='{"severity":"critical","title":"400 Errors","stream":"STREAM_PLACEHOLDER","alertType":"threshold","aggregates":{"aggregateConfig":[{"aggregateFunction":"count","conditions":{"operator":null,"conditionConfig":[{"column":"body","operator":"contains","value":"400"}]},"column":"body","operator":">","value":10}]},"evalConfig":{"rollingWindow":{"evalStart":"5h","evalEnd":"now","evalFrequency":1}},"targets":["TARGET_PLACEHOLDER"]}' + + alert2_json=$(echo "$alert2_template" | sed "s|STREAM_PLACEHOLDER|$P_STREAM|g" | sed "s|TARGET_PLACEHOLDER|$target_id|g") + + response2=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST -d "$alert2_json" "$P_URL/api/v1/alerts") + if [[ $? -eq 0 && -n "$response2" ]]; then + echo "Alert 2 created successfully" + else + echo "Failed to create Alert 2" + echo "Response: $response2" + fi + + # Alert 3: Trace ID or Span ID null + + alert3_template='{"severity":"high","title":"Trace ID or Span ID null","stream":"STREAM_PLACEHOLDER","alertType":"threshold","aggregates":{"aggregateConfig":[{"aggregateFunction":"count","conditions":{"operator":null,"conditionConfig":[{"column":"trace_id","operator":"is null","value":""}]},"column":"trace_id","operator":">","value":0}]},"evalConfig":{"rollingWindow":{"evalStart":"5h","evalEnd":"now","evalFrequency":1}},"targets":["TARGET_PLACEHOLDER"]}' + + alert3_json=$(echo "$alert3_template" | sed "s|STREAM_PLACEHOLDER|$P_STREAM|g" | sed "s|TARGET_PLACEHOLDER|$target_id|g") + + response3=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST -d "$alert3_json" "$P_URL/api/v1/alerts") + if [[ $? -eq 0 && -n "$response3" ]]; then + echo "Alert 3 created successfully" + else + echo "Failed to create Alert 3" + echo "Response: $response3" + fi + + sleep 1 +} + +# Main execution +# Create target and get ID +target_id=$(create_target) + +if [[ $? -eq 0 && -n "$target_id" ]]; then + echo "Target creation successful, proceeding with alerts..." + sleep 2 + + # Create alerts using the target ID + create_alerts "$target_id" +else + echo "Failed to create target, cannot proceed with alerts" + exit 1 +fi + +exit 0 \ No newline at end of file diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs index d7e90598f..aa2a22494 100644 --- a/src/handlers/http/demo_data.rs +++ b/src/handlers/http/demo_data.rs @@ -30,6 +30,8 @@ use std::os::unix::fs::PermissionsExt; // Embed the scripts at compile time const INGEST_SCRIPT: &str = include_str!("../../../resources/ingest_demo_data.sh"); const FILTER_SCRIPT: &str = include_str!("../../../resources/filters_demo_data.sh"); +const ALERT_SCRIPT: &str = include_str!("../../../resources/alerts_demo_data.sh"); + pub async fn get_demo_data(req: HttpRequest) -> Result { let query_map = web::Query::>::from_query(req.query_string()) .map_err(|_| PostError::InvalidQueryParameter)?; @@ -75,7 +77,7 @@ pub async fn get_demo_data(req: HttpRequest) -> Result "Demo data is not available in this mode" ))), }, - "filters" => { + "filters" | "alerts" => { // Fire the script execution asynchronously tokio::spawn(async move { if let Err(e) = execute_demo_script(&action, &url, username, password).await { @@ -103,6 +105,7 @@ async fn execute_demo_script( let script_content = match action { "ingest" => INGEST_SCRIPT, "filters" => FILTER_SCRIPT, + "alerts" => ALERT_SCRIPT, _ => return Err(anyhow::anyhow!("Unsupported action: {}", action)), }; // Write the script content to the temporary file From af190001f36784031a2817ad2d99ffba0c7f56f9 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 12 Jul 2025 00:58:08 -0700 Subject: [PATCH 04/13] add alerts, merge script --- resources/alerts_demo_data.sh | 206 ----------------- resources/filters_demo_data.sh | 177 --------------- resources/ingest_demo_data.sh | 394 ++++++++++++++++++++++++++++----- src/handlers/http/demo_data.rs | 37 +--- 4 files changed, 352 insertions(+), 462 deletions(-) delete mode 100755 resources/alerts_demo_data.sh delete mode 100755 resources/filters_demo_data.sh diff --git a/resources/alerts_demo_data.sh b/resources/alerts_demo_data.sh deleted file mode 100755 index 13bad7d2e..000000000 --- a/resources/alerts_demo_data.sh +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env bash - -# Configuration -P_URL=${P_URL:-"http://localhost:8000"} -P_USERNAME=${P_USERNAME:-"admin"} -P_PASSWORD=${P_PASSWORD:-"admin"} -P_STREAM=${P_STREAM:-"demodata"} - -# Pre-compute auth header -AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" - -# Common curl function with retry logic -curl_with_retry() { - local url="$1" - local method="$2" - local data="$3" - local content_type="${4:-application/json}" - local max_retries="${5:-3}" - local retry_count=0 - local data_file="$7" - - # Create temp file if data is provided (either as string or file) - if [[ -n "$data_file" ]]; then - temp_file="$data_file" - elif [[ -n "$data" ]]; then - temp_file=$(mktemp) - if [[ $? -ne 0 ]]; then - return 1 - fi - printf "%s" "$data" > "$temp_file" - fi - - while [[ $retry_count -lt $max_retries ]]; do - local max_time=$((10 + (retry_count * 10))) - local connect_timeout=5 - - local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" - - # Add headers - curl_cmd+=" -H \"Content-Type: $content_type\"" - curl_cmd+=" -H \"$AUTH_HEADER\"" - - # Add method and data - if [[ "$method" == "POST" ]]; then - curl_cmd+=" -X POST" - if [[ -n "$temp_file" ]]; then - curl_cmd+=" --data-binary \"@$temp_file\"" - elif [[ -n "$data" ]]; then - curl_cmd+=" -d \"$data\"" - fi - fi - - # Add URL - curl_cmd+=" \"$url\"" - - # Execute curl - local response - response=$(eval "$curl_cmd" 2>&1) - local curl_exit_code=$? - - # Check curl exit code - if [[ $curl_exit_code -eq 0 ]]; then - local status_code - if [[ -n "$response" ]]; then - status_code=$(echo "$response" | tail -n1) - local response_body=$(echo "$response" | sed '$d') - - # Clean up temp file (only if we created it) - if [[ -n "$temp_file" && -z "$data_file" ]]; then - rm -f "$temp_file" - fi - - if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then - echo "$response_body" - return 0 - else - return 1 - fi - else - return 1 - fi - elif [[ $curl_exit_code -eq 28 ]]; then - # Timeout - retry - retry_count=$((retry_count + 1)) - sleep 1 - else - break - fi - done - - # Clean up temp file on failure (only if we created it) - if [[ -n "$temp_file" && -z "$data_file" ]]; then - rm -f "$temp_file" - fi - - return 1 -} - -# Create webhook target -create_target() { - response=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST "$P_URL/api/v1/targets" -d @- << EOF -{"type":"webhook","endpoint":"https://webhook.site/8e1f26bd-2f5b-47a2-9d0b-3b3dabb30710","name":"Test Webhook","auth":{"username":"","password":""},"skipTlsCheck":false,"notificationConfig":{"interval":1,"times":1}} -EOF -) - - curl_exit_code=$? - - if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then - # Extract target ID from response - target_id=$(echo "$response" | grep -o '"id":"[^"]*"' | cut -d'"' -f4) - if [[ -n "$target_id" ]]; then - echo "Target created successfully with ID: $target_id" >&2 - echo "$target_id" - return 0 - else - echo "Failed to extract target ID from response" >&2 - echo "Response: $response" >&2 - return 1 - fi - else - echo "Failed to create target" >&2 - echo "Curl exit code: $curl_exit_code" >&2 - echo "Response: $response" >&2 - return 1 - fi -} - -# Create JSON file to avoid control character issues -create_json_file() { - local filename="$1" - local content="$2" - - # Create temporary file with proper JSON content - temp_json=$(mktemp) - printf "%s" "$content" > "$temp_json" - echo "$temp_json" -} - -# Create alerts -create_alerts() { - local target_id="$1" - - if [[ -z "$target_id" ]]; then - echo "Target ID is required to create alerts" - return 1 - fi - - # Alert 1: Error Count (severity_number = 18) - alert1_template='{"severity":"high","title":"error count","stream":"STREAM_PLACEHOLDER","alertType":"threshold","aggregates":{"aggregateConfig":[{"aggregateFunction":"count","conditions":{"operator":null,"conditionConfig":[{"column":"severity_number","operator":"=","value":"18"}]},"column":"severity_number","operator":">","value":1000}]},"evalConfig":{"rollingWindow":{"evalStart":"5h","evalEnd":"now","evalFrequency":1}},"targets":["TARGET_PLACEHOLDER"]}' - - alert1_json=$(echo "$alert1_template" | sed "s|STREAM_PLACEHOLDER|$P_STREAM|g" | sed "s|TARGET_PLACEHOLDER|$target_id|g") - response1=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST -d "$alert1_json" "$P_URL/api/v1/alerts") - if [[ $? -eq 0 && -n "$response1" ]]; then - echo "Alert 1 created successfully" - else - echo "Failed to create Alert 1" - echo "Response: $response1" - fi - - # Alert 2: 400 Errors - - alert2_template='{"severity":"critical","title":"400 Errors","stream":"STREAM_PLACEHOLDER","alertType":"threshold","aggregates":{"aggregateConfig":[{"aggregateFunction":"count","conditions":{"operator":null,"conditionConfig":[{"column":"body","operator":"contains","value":"400"}]},"column":"body","operator":">","value":10}]},"evalConfig":{"rollingWindow":{"evalStart":"5h","evalEnd":"now","evalFrequency":1}},"targets":["TARGET_PLACEHOLDER"]}' - - alert2_json=$(echo "$alert2_template" | sed "s|STREAM_PLACEHOLDER|$P_STREAM|g" | sed "s|TARGET_PLACEHOLDER|$target_id|g") - - response2=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST -d "$alert2_json" "$P_URL/api/v1/alerts") - if [[ $? -eq 0 && -n "$response2" ]]; then - echo "Alert 2 created successfully" - else - echo "Failed to create Alert 2" - echo "Response: $response2" - fi - - # Alert 3: Trace ID or Span ID null - - alert3_template='{"severity":"high","title":"Trace ID or Span ID null","stream":"STREAM_PLACEHOLDER","alertType":"threshold","aggregates":{"aggregateConfig":[{"aggregateFunction":"count","conditions":{"operator":null,"conditionConfig":[{"column":"trace_id","operator":"is null","value":""}]},"column":"trace_id","operator":">","value":0}]},"evalConfig":{"rollingWindow":{"evalStart":"5h","evalEnd":"now","evalFrequency":1}},"targets":["TARGET_PLACEHOLDER"]}' - - alert3_json=$(echo "$alert3_template" | sed "s|STREAM_PLACEHOLDER|$P_STREAM|g" | sed "s|TARGET_PLACEHOLDER|$target_id|g") - - response3=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST -d "$alert3_json" "$P_URL/api/v1/alerts") - if [[ $? -eq 0 && -n "$response3" ]]; then - echo "Alert 3 created successfully" - else - echo "Failed to create Alert 3" - echo "Response: $response3" - fi - - sleep 1 -} - -# Main execution -# Create target and get ID -target_id=$(create_target) - -if [[ $? -eq 0 && -n "$target_id" ]]; then - echo "Target creation successful, proceeding with alerts..." - sleep 2 - - # Create alerts using the target ID - create_alerts "$target_id" -else - echo "Failed to create target, cannot proceed with alerts" - exit 1 -fi - -exit 0 \ No newline at end of file diff --git a/resources/filters_demo_data.sh b/resources/filters_demo_data.sh deleted file mode 100755 index 98e1ba9f1..000000000 --- a/resources/filters_demo_data.sh +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/env bash - -# Configuration -P_URL=${P_URL:-"http://localhost:8000"} -P_USERNAME=${P_USERNAME:-"admin"} -P_PASSWORD=${P_PASSWORD:-"admin"} -P_STREAM=${P_STREAM:-"demodata"} - -# Pre-compute auth header -AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" - -# Common curl function with retry logic -curl_with_retry() { - local url="$1" - local method="$2" - local data="$3" - local content_type="${4:-application/json}" - local max_retries="${5:-3}" - local retry_count=0 - - # Create temp file if data is provided - if [[ -n "$data" ]]; then - temp_file=$(mktemp) - if [[ $? -ne 0 ]]; then - return 1 - fi - echo "$data" > "$temp_file" - fi - - while [[ $retry_count -lt $max_retries ]]; do - local max_time=$((10 + (retry_count * 10))) - local connect_timeout=5 - - local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" - - # Add headers - curl_cmd+=" -H \"Content-Type: $content_type\"" - curl_cmd+=" -H \"$AUTH_HEADER\"" - - # Add stream header for ingestion requests - if [[ "$url" == *"/ingest"* ]]; then - curl_cmd+=" -H \"X-P-STREAM: $P_STREAM\"" - fi - - # Add method and data - if [[ "$method" == "POST" ]]; then - curl_cmd+=" -X POST" - if [[ -n "$temp_file" ]]; then - curl_cmd+=" --data-binary \"@$temp_file\"" - elif [[ -n "$data" ]]; then - curl_cmd+=" -d \"$data\"" - fi - fi - - # Add URL - curl_cmd+=" \"$url\"" - - # Execute curl - local response - response=$(eval "$curl_cmd" 2>&1) - local curl_exit_code=$? - - # Check curl exit code - if [[ $curl_exit_code -eq 0 ]]; then - local status_code - if [[ -n "$response" ]]; then - status_code=$(echo "$response" | tail -n1) - - # Clean up temp file - [[ -n "$temp_file" ]] && rm -f "$temp_file" - - if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then - return 0 - else - return 1 - fi - else - return 1 - fi - elif [[ $curl_exit_code -eq 28 ]]; then - # Timeout - retry - retry_count=$((retry_count + 1)) - sleep 1 - else - break - fi - done - - # Clean up temp file on failure - [[ -n "$temp_file" ]] && rm -f "$temp_file" - - return 1 -} - -# Create SQL filters -create_sql_filters() { - sql_filters=( - "error_logs|Monitor all ERROR and FATAL severity events|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') ORDER BY time_unix_nano DESC LIMIT 100" - "high_response_time|Identify requests with extended response times|SELECT \"service.name\", \"url.path\", body FROM $P_STREAM WHERE body LIKE '%duration%' ORDER BY time_unix_nano DESC LIMIT 50" - "service_health_summary|Service health metrics by severity|SELECT \"service.name\", severity_text, COUNT(*) as count FROM $P_STREAM GROUP BY \"service.name\", severity_text ORDER BY count DESC" - "api_endpoint_performance|API endpoint request patterns|SELECT \"url.path\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"url.path\", \"service.name\" ORDER BY request_count DESC LIMIT 20" - "authentication_failures|Monitor auth-related warnings and errors|SELECT * FROM $P_STREAM WHERE \"url.path\" LIKE '%login%' AND severity_text IN ('WARN', 'ERROR') ORDER BY time_unix_nano DESC LIMIT 100" - "upstream_cluster_analysis|Request distribution across clusters|SELECT \"upstream.cluster\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"upstream.cluster\", \"service.name\" ORDER BY request_count DESC" - "trace_analysis|Multi-span traces for distributed tracking|SELECT trace_id, COUNT(*) as span_count, \"service.name\" FROM $P_STREAM GROUP BY trace_id, \"service.name\" HAVING span_count > 1 ORDER BY span_count DESC LIMIT 10" - "user_agent_distribution|Client types and user agent patterns|SELECT \"user_agent.original\", COUNT(*) as usage_count FROM $P_STREAM GROUP BY \"user_agent.original\" ORDER BY usage_count DESC LIMIT 15" - "source_address_analysis|Request distribution by source IP|SELECT \"source.address\", COUNT(*) as request_count, COUNT(DISTINCT \"service.name\") as services_accessed FROM $P_STREAM GROUP BY \"source.address\" ORDER BY request_count DESC LIMIT 20" - "severity_timeline|Severity trends over time|SELECT \"severity_text\", COUNT(*) as count, \"service.name\" FROM $P_STREAM GROUP BY \"severity_text\", \"service.name\" ORDER BY count DESC" - ) - - sql_success_count=0 - - for filter_config in "${sql_filters[@]}"; do - IFS='|' read -r name description query <<< "$filter_config" - - # Escape quotes for JSON - escaped_query=$(echo "$query" | sed 's/"/\\"/g') - escaped_desc=$(echo "$description" | sed 's/"/\\"/g') - - json="{\"stream_name\":\"sql\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"sql\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null}" - - if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then - sql_success_count=$((sql_success_count + 1)) - fi - - sleep 0.5 - done - - sleep 3 -} - -# Create saved filters -create_saved_filters() { - saved_filters=( - "service_errors|Monitor service errors and failures|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path|service.name" - "auth_security_events|Authentication and authorization monitoring|SELECT * FROM $P_STREAM WHERE url.path LIKE '%login%' AND severity_text IN ('WARN', 'ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,source.address,user_agent.original|severity_text" - "high_latency_requests|High response time requests|SELECT * FROM $P_STREAM WHERE body LIKE '%duration%' LIMIT 500|Ingestion Time,Data,service.name,url.path,upstream.cluster,body|service.name" - "upstream_cluster_health|Upstream cluster performance|SELECT * FROM $P_STREAM WHERE upstream.cluster IS NOT NULL LIMIT 500|Ingestion Time,Data,upstream.cluster,service.name,severity_text,destination.address|upstream.cluster" - "api_endpoint_monitoring|API endpoint usage patterns|SELECT * FROM $P_STREAM WHERE url.path IS NOT NULL LIMIT 500|Ingestion Time,Data,url.path,service.name,severity_text,source.address|url.path" - "trace_correlation_view|Correlated traces for distributed tracking|SELECT * FROM $P_STREAM WHERE trace_id IS NOT NULL AND span_id IS NOT NULL LIMIT 500|Ingestion Time,Data,trace_id,span_id,service.name,url.path|trace_id" - "user_agent_analysis|Client types and patterns|SELECT * FROM $P_STREAM WHERE user_agent.original IS NOT NULL LIMIT 500|Ingestion Time,Data,user_agent.original,source.address,url.path,service.name|user_agent.original" - "network_monitoring|Network traffic and server interactions|SELECT * FROM $P_STREAM WHERE source.address IS NOT NULL LIMIT 500|Ingestion Time,Data,source.address,destination.address,service.name,severity_text,url.path|source.address" - "service_overview|Comprehensive service activity view|SELECT * FROM $P_STREAM LIMIT 500|Ingestion Time,Data,service.name,url.path,source.address,destination.address,upstream.cluster|service.name" - "recent_activity|Most recent system activity|SELECT * FROM $P_STREAM ORDER BY time_unix_nano DESC LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path,source.address|severity_text" - ) - - saved_success_count=0 - - for filter_config in "${saved_filters[@]}"; do - IFS='|' read -r name description query visible_columns group_by <<< "$filter_config" - - # Escape quotes - escaped_query=$(echo "$query" | sed 's/"/\\"/g') - escaped_desc=$(echo "$description" | sed 's/"/\\"/g') - - # Convert visible columns to JSON array - IFS=',' read -ra col_array <<< "$visible_columns" - visible_cols_json="" - for i in "${!col_array[@]}"; do - [[ $i -gt 0 ]] && visible_cols_json+="," - visible_cols_json+="\"${col_array[$i]}\"" - done - - json="{\"stream_name\":\"$P_STREAM\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"filter\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null,\"tableConfig\":{\"visibleColumns\":[$visible_cols_json],\"pinnedColumns\":[]},\"groupBy\":\"$group_by\"}" - - if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then - saved_success_count=$((saved_success_count + 1)) - fi - - sleep 0.5 - done -} - -# Create all filters -create_sql_filters -create_saved_filters - -exit 0 \ No newline at end of file diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 084754769..249c838cc 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -5,6 +5,7 @@ P_URL=${P_URL:-"http://localhost:8000"} P_USERNAME=${P_USERNAME:-"admin"} P_PASSWORD=${P_PASSWORD:-"admin"} P_STREAM=${P_STREAM:-"demodata"} +ACTION=${ACTION:-"ingest"} TARGET_RECORDS=10000 BATCH_SIZE=1000 @@ -19,14 +20,17 @@ curl_with_retry() { local content_type="${4:-application/json}" local max_retries="${5:-3}" local retry_count=0 + local data_file="$7" - # Create temp file if data is provided - if [[ -n "$data" ]]; then + # Create temp file if data is provided (either as string or file) + if [[ -n "$data_file" ]]; then + temp_file="$data_file" + elif [[ -n "$data" ]]; then temp_file=$(mktemp) if [[ $? -ne 0 ]]; then return 1 fi - echo "$data" > "$temp_file" + printf "%s" "$data" > "$temp_file" fi while [[ $retry_count -lt $max_retries ]]; do @@ -67,11 +71,15 @@ curl_with_retry() { local status_code if [[ -n "$response" ]]; then status_code=$(echo "$response" | tail -n1) + local response_body=$(echo "$response" | sed '$d') - # Clean up temp file - [[ -n "$temp_file" ]] && rm -f "$temp_file" + # Clean up temp file (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then + echo "$response_body" return 0 else return 1 @@ -88,35 +96,41 @@ curl_with_retry() { fi done - # Clean up temp file on failure - [[ -n "$temp_file" ]] && rm -f "$temp_file" + # Clean up temp file on failure (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi return 1 } -# Pre-compute static data -TRACE_IDS=() -SPAN_IDS=() -IP_ADDRESSES=() -TIMESTAMPS=() -UNIX_NANOS=() - -# Generate 100 of each for cycling through -for i in {1..100}; do - TRACE_IDS+=("$(printf '%032x' $((RANDOM * RANDOM)))") - SPAN_IDS+=("$(printf '%016x' $((RANDOM * RANDOM)))") - IP_ADDRESSES+=("192.168.$((RANDOM % 256)).$((RANDOM % 256))") - TIMESTAMPS+=("$(date -u +%Y-%m-%dT%H:%M:%S.%03dZ -d "+$((RANDOM % 3600)) seconds")") - UNIX_NANOS+=("$(date +%s)$(printf '%09d' $((RANDOM % 1000000000)))") -done - -# Static arrays -METHODS=("GET" "GET" "GET" "GET" "POST" "PUT") -STATUS_CODES=(200 200 200 201 400 500) -SERVICES=("frontend" "api" "auth" "cart" "payment") -ENDPOINTS=("/products" "/cart" "/login" "/checkout" "/search") -USER_AGENTS=("curl/7.88.1" "python-requests/2.32.3" "Mozilla/5.0") -CLUSTERS=("web" "api" "db") +# ==================== INGEST FUNCTIONALITY ==================== + +# Pre-compute static data for ingestion +init_ingest_data() { + TRACE_IDS=() + SPAN_IDS=() + IP_ADDRESSES=() + TIMESTAMPS=() + UNIX_NANOS=() + + # Generate 100 of each for cycling through + for i in {1..100}; do + TRACE_IDS+=("$(printf '%032x' $((RANDOM * RANDOM)))") + SPAN_IDS+=("$(printf '%016x' $((RANDOM * RANDOM)))") + IP_ADDRESSES+=("192.168.$((RANDOM % 256)).$((RANDOM % 256))") + TIMESTAMPS+=("$(date -u +%Y-%m-%dT%H:%M:%S.%03dZ -d "+$((RANDOM % 3600)) seconds")") + UNIX_NANOS+=("$(date +%s)$(printf '%09d' $((RANDOM % 1000000000)))") + done + + # Static arrays + METHODS=("GET" "GET" "GET" "GET" "POST" "PUT") + STATUS_CODES=(200 200 200 201 400 500) + SERVICES=("frontend" "api" "auth" "cart" "payment") + ENDPOINTS=("/products" "/cart" "/login" "/checkout" "/search") + USER_AGENTS=("curl/7.88.1" "python-requests/2.32.3" "Mozilla/5.0") + CLUSTERS=("web" "api" "db") +} # Generate batch data generate_batch() { @@ -196,37 +210,311 @@ send_batch() { curl_with_retry "$P_URL/api/v1/ingest" "POST" "$data" "application/json" 3 15 } -# Main execution -START_TIME=$(date +%s) -RECORDS_SENT=0 +# Main ingest function +run_ingest() { + echo "Starting data ingestion..." + init_ingest_data + + START_TIME=$(date +%s) + RECORDS_SENT=0 + + while [[ $RECORDS_SENT -lt $TARGET_RECORDS ]]; do + remaining=$((TARGET_RECORDS - RECORDS_SENT)) + current_batch_size=$((remaining > BATCH_SIZE ? BATCH_SIZE : remaining)) + + # Progress tracking + progress=$((RECORDS_SENT * 100 / TARGET_RECORDS)) + elapsed=$(($(date +%s) - START_TIME)) + rate=$((RECORDS_SENT / (elapsed == 0 ? 1 : elapsed))) + + echo "Progress: $progress% ($RECORDS_SENT/$TARGET_RECORDS) - Rate: $rate records/sec" + + # Generate and send batch + batch_data=$(generate_batch $current_batch_size) + + if [[ -z "$batch_data" ]]; then + echo "Failed to generate batch data" + exit 1 + fi + + if send_batch "$batch_data"; then + RECORDS_SENT=$((RECORDS_SENT + current_batch_size)) + else + echo "Failed to send batch" + exit 1 + fi + + sleep 0.1 + done + + # Final statistics + TOTAL_TIME=$(($(date +%s) - START_TIME)) + FINAL_RATE=$((TARGET_RECORDS / (TOTAL_TIME == 0 ? 1 : TOTAL_TIME))) + + echo "Ingestion completed: $TARGET_RECORDS records in $TOTAL_TIME seconds (avg: $FINAL_RATE records/sec)" +} + +# ==================== FILTERS FUNCTIONALITY ==================== + +# Create SQL filters +create_sql_filters() { + echo "Creating SQL filters..." + + sql_filters=( + "error_logs|Monitor all ERROR and FATAL severity events|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') ORDER BY time_unix_nano DESC LIMIT 100" + "high_response_time|Identify requests with extended response times|SELECT \"service.name\", \"url.path\", body FROM $P_STREAM WHERE body LIKE '%duration%' ORDER BY time_unix_nano DESC LIMIT 50" + "service_health_summary|Service health metrics by severity|SELECT \"service.name\", severity_text, COUNT(*) as count FROM $P_STREAM GROUP BY \"service.name\", severity_text ORDER BY count DESC" + "api_endpoint_performance|API endpoint request patterns|SELECT \"url.path\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"url.path\", \"service.name\" ORDER BY request_count DESC LIMIT 20" + "authentication_failures|Monitor auth-related warnings and errors|SELECT * FROM $P_STREAM WHERE \"url.path\" LIKE '%login%' AND severity_text IN ('WARN', 'ERROR') ORDER BY time_unix_nano DESC LIMIT 100" + "upstream_cluster_analysis|Request distribution across clusters|SELECT \"upstream.cluster\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"upstream.cluster\", \"service.name\" ORDER BY request_count DESC" + "trace_analysis|Multi-span traces for distributed tracking|SELECT trace_id, COUNT(*) as span_count, \"service.name\" FROM $P_STREAM GROUP BY trace_id, \"service.name\" HAVING span_count > 1 ORDER BY span_count DESC LIMIT 10" + "user_agent_distribution|Client types and user agent patterns|SELECT \"user_agent.original\", COUNT(*) as usage_count FROM $P_STREAM GROUP BY \"user_agent.original\" ORDER BY usage_count DESC LIMIT 15" + "source_address_analysis|Request distribution by source IP|SELECT \"source.address\", COUNT(*) as request_count, COUNT(DISTINCT \"service.name\") as services_accessed FROM $P_STREAM GROUP BY \"source.address\" ORDER BY request_count DESC LIMIT 20" + "severity_timeline|Severity trends over time|SELECT \"severity_text\", COUNT(*) as count, \"service.name\" FROM $P_STREAM GROUP BY \"severity_text\", \"service.name\" ORDER BY count DESC" + ) + + sql_success_count=0 + + for filter_config in "${sql_filters[@]}"; do + IFS='|' read -r name description query <<< "$filter_config" + + # Escape quotes for JSON + escaped_query=$(echo "$query" | sed 's/"/\\"/g') + escaped_desc=$(echo "$description" | sed 's/"/\\"/g') + + json="{\"stream_name\":\"sql\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"sql\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null}" + + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then + sql_success_count=$((sql_success_count + 1)) + echo "Created SQL filter: $name" + else + echo "Failed to create SQL filter: $name" + fi + + sleep 0.5 + done + + echo "Created $sql_success_count SQL filters" + sleep 3 +} + +# Create saved filters +create_saved_filters() { + echo "Creating saved filters..." + + saved_filters=( + "service_errors|Monitor service errors and failures|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path|service.name" + "auth_security_events|Authentication and authorization monitoring|SELECT * FROM $P_STREAM WHERE url.path LIKE '%login%' AND severity_text IN ('WARN', 'ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,source.address,user_agent.original|severity_text" + "high_latency_requests|High response time requests|SELECT * FROM $P_STREAM WHERE body LIKE '%duration%' LIMIT 500|Ingestion Time,Data,service.name,url.path,upstream.cluster,body|service.name" + "upstream_cluster_health|Upstream cluster performance|SELECT * FROM $P_STREAM WHERE upstream.cluster IS NOT NULL LIMIT 500|Ingestion Time,Data,upstream.cluster,service.name,severity_text,destination.address|upstream.cluster" + "api_endpoint_monitoring|API endpoint usage patterns|SELECT * FROM $P_STREAM WHERE url.path IS NOT NULL LIMIT 500|Ingestion Time,Data,url.path,service.name,severity_text,source.address|url.path" + "trace_correlation_view|Correlated traces for distributed tracking|SELECT * FROM $P_STREAM WHERE trace_id IS NOT NULL AND span_id IS NOT NULL LIMIT 500|Ingestion Time,Data,trace_id,span_id,service.name,url.path|trace_id" + "user_agent_analysis|Client types and patterns|SELECT * FROM $P_STREAM WHERE user_agent.original IS NOT NULL LIMIT 500|Ingestion Time,Data,user_agent.original,source.address,url.path,service.name|user_agent.original" + "network_monitoring|Network traffic and server interactions|SELECT * FROM $P_STREAM WHERE source.address IS NOT NULL LIMIT 500|Ingestion Time,Data,source.address,destination.address,service.name,severity_text,url.path|source.address" + "service_overview|Comprehensive service activity view|SELECT * FROM $P_STREAM LIMIT 500|Ingestion Time,Data,service.name,url.path,source.address,destination.address,upstream.cluster|service.name" + "recent_activity|Most recent system activity|SELECT * FROM $P_STREAM ORDER BY time_unix_nano DESC LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path,source.address|severity_text" + ) + + saved_success_count=0 + + for filter_config in "${saved_filters[@]}"; do + IFS='|' read -r name description query visible_columns group_by <<< "$filter_config" + + # Escape quotes + escaped_query=$(echo "$query" | sed 's/"/\\"/g') + escaped_desc=$(echo "$description" | sed 's/"/\\"/g') + + # Convert visible columns to JSON array + IFS=',' read -ra col_array <<< "$visible_columns" + visible_cols_json="" + for i in "${!col_array[@]}"; do + [[ $i -gt 0 ]] && visible_cols_json+="," + visible_cols_json+="\"${col_array[$i]}\"" + done + + json="{\"stream_name\":\"$P_STREAM\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"filter\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null,\"tableConfig\":{\"visibleColumns\":[$visible_cols_json],\"pinnedColumns\":[]},\"groupBy\":\"$group_by\"}" + + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then + saved_success_count=$((saved_success_count + 1)) + echo "Created saved filter: $name" + else + echo "Failed to create saved filter: $name" + fi + + sleep 0.5 + done + + echo "Created $saved_success_count saved filters" +} + +# Main filters function +run_filters() { + echo "Starting filter creation..." + create_sql_filters + create_saved_filters + echo "Filter creation completed" +} -while [[ $RECORDS_SENT -lt $TARGET_RECORDS ]]; do - remaining=$((TARGET_RECORDS - RECORDS_SENT)) - current_batch_size=$((remaining > BATCH_SIZE ? BATCH_SIZE : remaining)) +# ==================== ALERTS FUNCTIONALITY ==================== + +# Create webhook target +create_target() { + echo "Creating webhook target..." >&2 + + response=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST "$P_URL/api/v1/targets" -d @- << EOF +{"type":"webhook","endpoint":"https://webhook.site/8e1f26bd-2f5b-47a2-9d0b-3b3dabb30710","name":"Test Webhook","auth":{"username":"","password":""},"skipTlsCheck":false,"notificationConfig":{"interval":1,"times":1}} +EOF +) - # Progress tracking - progress=$((RECORDS_SENT * 100 / TARGET_RECORDS)) - elapsed=$(($(date +%s) - START_TIME)) - rate=$((RECORDS_SENT / (elapsed == 0 ? 1 : elapsed))) + curl_exit_code=$? - # Generate and send batch - batch_data=$(generate_batch $current_batch_size) + if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then + # Extract target ID from response + target_id=$(echo "$response" | grep -o '"id":"[^"]*"' | cut -d'"' -f4) + if [[ -n "$target_id" ]]; then + echo "Target created successfully with ID: $target_id" >&2 + echo "$target_id" + return 0 + else + echo "Failed to extract target ID from response" >&2 + echo "Response: $response" >&2 + return 1 + fi + else + echo "Failed to create target" >&2 + echo "Curl exit code: $curl_exit_code" >&2 + echo "Response: $response" >&2 + return 1 + fi +} + +# Create alerts +create_alerts() { + local target_id="$1" - if [[ -z "$batch_data" ]]; then - exit 1 + if [[ -z "$target_id" ]]; then + echo "Target ID is required to create alerts" + return 1 fi - if send_batch "$batch_data"; then - RECORDS_SENT=$((RECORDS_SENT + current_batch_size)) + echo "Creating alerts with target ID: $target_id" + + # Alert 1: Error Count (severity_number = 18) + alert1_json="{\"severity\":\"high\",\"title\":\"error count\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"severity_number\",\"operator\":\"=\",\"value\":\"18\"}]},\"column\":\"severity_number\",\"operator\":\">\",\"value\":1000}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + + response1=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert1_json" "application/json" 3 10) + if [[ $? -eq 0 ]]; then + echo "Alert 1 (Error Count) created successfully" else - exit 1 + echo "Failed to create Alert 1 (Error Count)" + echo "Response: $response1" fi - sleep 0.1 -done + # Alert 2: 400 Errors + alert2_json="{\"severity\":\"critical\",\"title\":\"400 Errors\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"body\",\"operator\":\"contains\",\"value\":\"400\"}]},\"column\":\"body\",\"operator\":\">\",\"value\":10}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + + response2=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert2_json" "application/json" 3 10) + if [[ $? -eq 0 ]]; then + echo "Alert 2 (400 Errors) created successfully" + else + echo "Failed to create Alert 2 (400 Errors)" + echo "Response: $response2" + fi + + # Alert 3: Trace ID or Span ID null + alert3_json="{\"severity\":\"high\",\"title\":\"Trace ID or Span ID null\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"trace_id\",\"operator\":\"is null\",\"value\":\"\"}]},\"column\":\"trace_id\",\"operator\":\">\",\"value\":0}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + + response3=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert3_json" "application/json" 3 10) + if [[ $? -eq 0 ]]; then + echo "Alert 3 (Trace ID null) created successfully" + else + echo "Failed to create Alert 3 (Trace ID null)" + echo "Response: $response3" + fi + + sleep 1 +} + +# Main alerts function +run_alerts() { + echo "Starting alert creation..." + + # Create target and get ID + target_id=$(create_target) -# Final statistics -TOTAL_TIME=$(($(date +%s) - START_TIME)) -FINAL_RATE=$((TARGET_RECORDS / (TOTAL_TIME == 0 ? 1 : TOTAL_TIME))) + if [[ $? -eq 0 && -n "$target_id" ]]; then + echo "Target creation successful, proceeding with alerts..." + sleep 2 + + # Create alerts using the target ID + create_alerts "$target_id" + echo "Alert creation completed" + else + echo "Failed to create target, cannot proceed with alerts" + return 1 + fi +} + +# ==================== MAIN EXECUTION ==================== + +# Display usage +show_usage() { + echo "Usage: $0 [ACTION=ingest|filters|alerts|all]" + echo "" + echo "Environment variables:" + echo " P_URL - API URL (default: http://localhost:8000)" + echo " P_USERNAME - Username (default: admin)" + echo " P_PASSWORD - Password (default: admin)" + echo " P_STREAM - Stream name (default: demodata)" + echo " ACTION - Action to perform (default: ingest)" + echo "" + echo "Actions:" + echo " ingest - Ingest demo log data" + echo " filters - Create SQL and saved filters" + echo " alerts - Create alerts and webhook targets" + echo " all - Run all actions in sequence" + echo "" + echo "Examples:" + echo " ACTION=ingest ./script.sh" + echo " ACTION=filters P_STREAM=mystream ./script.sh" + echo " ACTION=all ./script.sh" +} + +# Main execution logic +main() { + case "$ACTION" in + "ingest") + run_ingest + ;; + "filters") + run_filters + ;; + "alerts") + run_alerts + ;; + "all") + echo "Running all actions..." + run_ingest + echo "Waiting before creating filters..." + sleep 5 + run_filters + echo "Waiting before creating alerts..." + sleep 5 + run_alerts + echo "All actions completed" + ;; + "help"|"--help"|"-h") + show_usage + exit 0 + ;; + *) + echo "Unknown action: $ACTION" + show_usage + exit 1 + ;; + esac +} -exit 0 \ No newline at end of file +# Execute main function +main "$@" +exit $? \ No newline at end of file diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs index aa2a22494..a6d1a6d02 100644 --- a/src/handlers/http/demo_data.rs +++ b/src/handlers/http/demo_data.rs @@ -28,9 +28,7 @@ use std::{collections::HashMap, fs, process::Command}; use std::os::unix::fs::PermissionsExt; // Embed the scripts at compile time -const INGEST_SCRIPT: &str = include_str!("../../../resources/ingest_demo_data.sh"); -const FILTER_SCRIPT: &str = include_str!("../../../resources/filters_demo_data.sh"); -const ALERT_SCRIPT: &str = include_str!("../../../resources/alerts_demo_data.sh"); +const DEMO_SCRIPT: &str = include_str!("../../../resources/ingest_demo_data.sh"); pub async fn get_demo_data(req: HttpRequest) -> Result { let query_map = web::Query::>::from_query(req.query_string()) @@ -56,22 +54,17 @@ pub async fn get_demo_data(req: HttpRequest) -> Result Mode::Ingest | Mode::All => { // Fire the script execution asynchronously tokio::spawn(async move { - if let Err(e) = execute_demo_script(&action, &url, username, password).await { - tracing::warn!("Failed to execute demo script: {}", e); - } + execute_demo_script(&action, &url, username, password).await }); Ok(HttpResponse::Accepted().finish()) } Mode::Query | Mode::Prism => { // Forward the request to ingestor asynchronously - tokio::spawn(async move { - if let Err(e) = get_demo_data_from_ingestor(&action).await { - tracing::warn!("Failed to forward request to ingestor: {}", e); - } - }); - - Ok(HttpResponse::Accepted().finish()) + match get_demo_data_from_ingestor(&action).await { + Ok(()) => Ok(HttpResponse::Accepted().finish()), + Err(e) => Err(PostError::Invalid(anyhow::anyhow!(e))), + } } _ => Err(PostError::Invalid(anyhow::anyhow!( "Demo data is not available in this mode" @@ -79,11 +72,9 @@ pub async fn get_demo_data(req: HttpRequest) -> Result }, "filters" | "alerts" => { // Fire the script execution asynchronously - tokio::spawn(async move { - if let Err(e) = execute_demo_script(&action, &url, username, password).await { - tracing::warn!("Failed to execute demo script: {}", e); - } - }); + tokio::spawn( + async move { execute_demo_script(&action, &url, username, password).await }, + ); Ok(HttpResponse::Accepted().finish()) } @@ -102,14 +93,8 @@ async fn execute_demo_script( .map_err(|e| anyhow::anyhow!("Failed to create temporary file: {}", e))?; let temp_path = temp_file.path(); - let script_content = match action { - "ingest" => INGEST_SCRIPT, - "filters" => FILTER_SCRIPT, - "alerts" => ALERT_SCRIPT, - _ => return Err(anyhow::anyhow!("Unsupported action: {}", action)), - }; // Write the script content to the temporary file - fs::write(temp_path, script_content) + fs::write(temp_path, DEMO_SCRIPT) .map_err(|e| anyhow::anyhow!("Failed to write script to temp file: {}", e))?; // Make the temporary file executable (Unix only) @@ -128,7 +113,7 @@ async fn execute_demo_script( .env("P_URL", url) .env("P_USERNAME", username) .env("P_PASSWORD", password) - .env("DEMO_ACTION", action) + .env("ACTION", action) .output() .map_err(|e| { anyhow::anyhow!( From f8db9611a1998fe88b630513882e1afc2f45c51e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 13 Jul 2025 11:16:37 -0700 Subject: [PATCH 05/13] add dashboards to the script --- resources/ingest_demo_data.sh | 381 +++++++++++++++++++++++++++++++++- 1 file changed, 380 insertions(+), 1 deletion(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 249c838cc..e627f118a 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -56,6 +56,13 @@ curl_with_retry() { elif [[ -n "$data" ]]; then curl_cmd+=" -d \"$data\"" fi + elif [[ "$method" == "PUT" ]]; then + curl_cmd+=" -X PUT" + if [[ -n "$temp_file" ]]; then + curl_cmd+=" --data-binary \"@$temp_file\"" + elif [[ -n "$data" ]]; then + curl_cmd+=" -d \"$data\"" + fi fi # Add URL @@ -455,11 +462,375 @@ run_alerts() { fi } +# ==================== DASHBOARDS FUNCTIONALITY ==================== + +# Create dashboard +create_dashboard() { + echo "Creating dashboard..." >&2 + + response=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST "$P_URL/api/v1/dashboards" -d @- << EOF +{ + "title": "Demo Dashboard", + "tags": [ + "demo", + "oss" + ] +} +EOF +) + + curl_exit_code=$? + + if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then + # Extract dashboard ID from response + dashboard_id=$(echo "$response" | grep -o '"dashboardId":"[^"]*"' | cut -d'"' -f4) + if [[ -n "$dashboard_id" ]]; then + echo "Dashboard created successfully with ID: $dashboard_id" >&2 + echo "$dashboard_id" + return 0 + else + echo "Failed to extract dashboard ID from response" >&2 + echo "Response: $response" >&2 + return 1 + fi + else + echo "Failed to create dashboard" >&2 + echo "Curl exit code: $curl_exit_code" >&2 + echo "Response: $response" >&2 + return 1 + fi +} + +# Update dashboard with tiles +update_dashboard() { + local dashboard_id="$1" + + if [[ -z "$dashboard_id" ]]; then + echo "Dashboard ID is required to update dashboard" + return 1 + fi + + echo "Updating dashboard with ID: $dashboard_id" + + # Create the dashboard configuration with updated tiles + dashboard_config=$(cat << EOF +{ + "title": "Demo Dashboard", + "dashboardId": "$dashboard_id", + "tags": [ + "demo", + "oss" + ], + "isFavorite": true, + "dashboardType": "Dashboard", + "tiles": [ + { + "tile_id": "01K017X5NG2SZ20PJ0EEYG9376", + "title": "Service Error Rate Over Time", + "chartQuery": { + "x": { + "fields": [ + { + "name": "p_timestamp", + "type": "time" + } + ], + "granularity": "minute" + }, + "y": { + "fields": [ + { + "name": "severity_number", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "severity_text" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "timeseries", + "config": { + "type": "bar", + "colourScheme": "forest", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "time_bucket", + "title": "Time" + }, + "y": { + "field": "COUNT_severity_number", + "title": "Event Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 12, + "h": 8, + "x": 0, + "y": 0, + "i": "01K017X5NG2SZ20PJ0EEYG9376", + "moved": false, + "static": false + } + }, + { + "tile_id": "01K027HTD413T9MP39KYEE42GS", + "title": "Request Count by Service", + "chartQuery": { + "x": { + "fields": [ + { + "name": "service.name", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "service.name" + ] + }, + "y": { + "fields": [ + { + "name": "url.path", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "url.path" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "line", + "config": { + "type": "line", + "colourScheme": "cyber", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "service.name", + "title": "Service" + }, + "y": { + "field": "COUNT_url.path", + "title": "Request Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 4, + "h": 8, + "x": 0, + "y": 8, + "i": "01K027HTD413T9MP39KYEE42GS", + "moved": false, + "static": false + } + }, + { + "tile_id": "01K027MQ5K75VSCFGVVN86MBMJ", + "title": "Response Status Distribution by Upstream Cluster", + "chartQuery": { + "x": { + "fields": [ + { + "name": "upstream.cluster", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "upstream.cluster" + ] + }, + "y": { + "fields": [ + { + "name": "severity_text", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "severity_text" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "bar", + "config": { + "type": "bar", + "colourScheme": "dusk", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "upstream.cluster", + "title": "Upstream Cluster" + }, + "y": { + "field": "COUNT_severity_text", + "title": "Response Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 8, + "h": 8, + "x": 4, + "y": 8, + "i": "01K027MQ5K75VSCFGVVN86MBMJ", + "moved": false, + "static": false + } + }, + { + "tile_id": "01K027RM6R3EQ6K960ECSKP5PX", + "title": "User Agent Distribution by Source Address", + "chartQuery": { + "x": { + "fields": [ + { + "name": "source.address", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "source.address" + ] + }, + "y": { + "fields": [ + { + "name": "user_agent.original", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "user_agent.original" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "area", + "config": { + "type": "area", + "colourScheme": "forest", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "source.address", + "title": "Source IP Address" + }, + "y": { + "field": "COUNT_user_agent.original", + "title": "User Agent Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 7, + "h": 7, + "x": 0, + "y": 16, + "i": "01K027RM6R3EQ6K960ECSKP5PX", + "moved": false, + "static": false + } + } + ] +} +EOF +) + + response=$(curl_with_retry "$P_URL/api/v1/dashboards/$dashboard_id" "PUT" "$dashboard_config" "application/json" 3 10) + if [[ $? -eq 0 ]]; then + echo "Dashboard updated successfully" + return 0 + else + echo "Failed to update dashboard" + echo "Response: $response" + return 1 + fi +} + +# Main dashboards function +run_dashboards() { + echo "Starting dashboard creation..." + + # Create dashboard and get ID + dashboard_id=$(create_dashboard) + + if [[ $? -eq 0 && -n "$dashboard_id" ]]; then + echo "Dashboard creation successful, proceeding with tiles..." + sleep 2 + + # Update dashboard with tiles + update_dashboard "$dashboard_id" + echo "Dashboard creation completed" + else + echo "Failed to create dashboard, cannot proceed with tiles" + return 1 + fi +} + # ==================== MAIN EXECUTION ==================== # Display usage show_usage() { - echo "Usage: $0 [ACTION=ingest|filters|alerts|all]" + echo "Usage: $0 [ACTION=ingest|filters|alerts|dashboards|all]" echo "" echo "Environment variables:" echo " P_URL - API URL (default: http://localhost:8000)" @@ -472,11 +843,13 @@ show_usage() { echo " ingest - Ingest demo log data" echo " filters - Create SQL and saved filters" echo " alerts - Create alerts and webhook targets" + echo " dashboards - Create demo dashboard with tiles" echo " all - Run all actions in sequence" echo "" echo "Examples:" echo " ACTION=ingest ./script.sh" echo " ACTION=filters P_STREAM=mystream ./script.sh" + echo " ACTION=dashboards ./script.sh" echo " ACTION=all ./script.sh" } @@ -492,6 +865,9 @@ main() { "alerts") run_alerts ;; + "dashboards") + run_dashboards + ;; "all") echo "Running all actions..." run_ingest @@ -501,6 +877,9 @@ main() { echo "Waiting before creating alerts..." sleep 5 run_alerts + echo "Waiting before creating dashboards..." + sleep 5 + run_dashboards echo "All actions completed" ;; "help"|"--help"|"-h") From 5ab2c5e9fc7fe172413547eafdf91dc17218dd00 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 13 Jul 2025 19:20:16 -0700 Subject: [PATCH 06/13] refactor script --- resources/ingest_demo_data.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index e627f118a..5d1cccf22 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -19,8 +19,8 @@ curl_with_retry() { local data="$3" local content_type="${4:-application/json}" local max_retries="${5:-3}" + local data_file="$6" local retry_count=0 - local data_file="$7" # Create temp file if data is provided (either as string or file) if [[ -n "$data_file" ]]; then @@ -214,7 +214,7 @@ send_batch() { return 1 fi - curl_with_retry "$P_URL/api/v1/ingest" "POST" "$data" "application/json" 3 15 + curl_with_retry "$P_URL/api/v1/ingest" "POST" "$data" "application/json" 3 } # Main ingest function @@ -291,7 +291,7 @@ create_sql_filters() { json="{\"stream_name\":\"sql\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"sql\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null}" - if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3; then sql_success_count=$((sql_success_count + 1)) echo "Created SQL filter: $name" else @@ -341,7 +341,7 @@ create_saved_filters() { json="{\"stream_name\":\"$P_STREAM\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"filter\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null,\"tableConfig\":{\"visibleColumns\":[$visible_cols_json],\"pinnedColumns\":[]},\"groupBy\":\"$group_by\"}" - if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3 10; then + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3; then saved_success_count=$((saved_success_count + 1)) echo "Created saved filter: $name" else @@ -409,7 +409,7 @@ create_alerts() { # Alert 1: Error Count (severity_number = 18) alert1_json="{\"severity\":\"high\",\"title\":\"error count\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"severity_number\",\"operator\":\"=\",\"value\":\"18\"}]},\"column\":\"severity_number\",\"operator\":\">\",\"value\":1000}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" - response1=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert1_json" "application/json" 3 10) + response1=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert1_json" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Alert 1 (Error Count) created successfully" else @@ -420,7 +420,7 @@ create_alerts() { # Alert 2: 400 Errors alert2_json="{\"severity\":\"critical\",\"title\":\"400 Errors\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"body\",\"operator\":\"contains\",\"value\":\"400\"}]},\"column\":\"body\",\"operator\":\">\",\"value\":10}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" - response2=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert2_json" "application/json" 3 10) + response2=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert2_json" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Alert 2 (400 Errors) created successfully" else @@ -431,7 +431,7 @@ create_alerts() { # Alert 3: Trace ID or Span ID null alert3_json="{\"severity\":\"high\",\"title\":\"Trace ID or Span ID null\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"trace_id\",\"operator\":\"is null\",\"value\":\"\"}]},\"column\":\"trace_id\",\"operator\":\">\",\"value\":0}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" - response3=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert3_json" "application/json" 3 10) + response3=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert3_json" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Alert 3 (Trace ID null) created successfully" else @@ -795,7 +795,7 @@ update_dashboard() { EOF ) - response=$(curl_with_retry "$P_URL/api/v1/dashboards/$dashboard_id" "PUT" "$dashboard_config" "application/json" 3 10) + response=$(curl_with_retry "$P_URL/api/v1/dashboards/$dashboard_id" "PUT" "$dashboard_config" "application/json" 3) if [[ $? -eq 0 ]]; then echo "Dashboard updated successfully" return 0 From ce5f193723b362db04b6b6764412e2e0eab1cfc7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 13 Jul 2025 19:26:21 -0700 Subject: [PATCH 07/13] add dashboards action --- src/handlers/http/demo_data.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs index a6d1a6d02..db74d5d70 100644 --- a/src/handlers/http/demo_data.rs +++ b/src/handlers/http/demo_data.rs @@ -70,7 +70,7 @@ pub async fn get_demo_data(req: HttpRequest) -> Result "Demo data is not available in this mode" ))), }, - "filters" | "alerts" => { + "filters" | "alerts" | "dashboards" => { // Fire the script execution asynchronously tokio::spawn( async move { execute_demo_script(&action, &url, username, password).await }, From db9dd5b2b0c6a237b0ae371fba64beb7f876b33b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 13 Jul 2025 19:42:59 -0700 Subject: [PATCH 08/13] curl command as array of arguments --- resources/ingest_demo_data.sh | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 5d1cccf22..caff6c271 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -37,40 +37,43 @@ curl_with_retry() { local max_time=$((10 + (retry_count * 10))) local connect_timeout=5 - local curl_cmd="curl -s -w \"\n%{http_code}\" --max-time $max_time --connect-timeout $connect_timeout" - - # Add headers - curl_cmd+=" -H \"Content-Type: $content_type\"" - curl_cmd+=" -H \"$AUTH_HEADER\"" + local curl_args=( + -s + -w '\n%{http_code}' + --max-time "$max_time" + --connect-timeout "$connect_timeout" + -H "Content-Type: $content_type" + -H "$AUTH_HEADER" + ) # Add stream header for ingestion requests if [[ "$url" == *"/ingest"* ]]; then - curl_cmd+=" -H \"X-P-STREAM: $P_STREAM\"" + curl_args+=(-H "X-P-STREAM: $P_STREAM") fi # Add method and data if [[ "$method" == "POST" ]]; then - curl_cmd+=" -X POST" + curl_args+=(-X POST) if [[ -n "$temp_file" ]]; then - curl_cmd+=" --data-binary \"@$temp_file\"" + curl_args+=(--data-binary "@$temp_file") elif [[ -n "$data" ]]; then - curl_cmd+=" -d \"$data\"" + curl_args+=(-d "$data") fi elif [[ "$method" == "PUT" ]]; then - curl_cmd+=" -X PUT" + curl_args+=(-X PUT) if [[ -n "$temp_file" ]]; then - curl_cmd+=" --data-binary \"@$temp_file\"" + curl_args+=(--data-binary "@$temp_file") elif [[ -n "$data" ]]; then - curl_cmd+=" -d \"$data\"" + curl_args+=(-d "$data") fi fi # Add URL - curl_cmd+=" \"$url\"" + curl_args+=("$url") # Execute curl local response - response=$(eval "$curl_cmd" 2>&1) + response=$(curl "${curl_args[@]}" 2>&1) local curl_exit_code=$? # Check curl exit code From 2a7810781addbed28b3a24d4dd5c7c64c8ff2b75 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 13 Jul 2025 23:17:04 -0700 Subject: [PATCH 09/13] refactor demo script --- resources/ingest_demo_data.sh | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index caff6c271..b65245d46 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -22,6 +22,7 @@ curl_with_retry() { local data_file="$6" local retry_count=0 + local temp_file="" # Create temp file if data is provided (either as string or file) if [[ -n "$data_file" ]]; then temp_file="$data_file" @@ -88,7 +89,7 @@ curl_with_retry() { rm -f "$temp_file" fi - if [[ "$status_code" == "200" || "$status_code" == "201" ]]; then + if [[ "$status_code" =~ ^2[0-9][0-9]$ ]]; then echo "$response_body" return 0 else @@ -380,7 +381,7 @@ EOF if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then # Extract target ID from response - target_id=$(echo "$response" | grep -o '"id":"[^"]*"' | cut -d'"' -f4) + target_id=$(echo "$response" | jq -r '.id // empty') if [[ -n "$target_id" ]]; then echo "Target created successfully with ID: $target_id" >&2 echo "$target_id" @@ -486,7 +487,7 @@ EOF if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then # Extract dashboard ID from response - dashboard_id=$(echo "$response" | grep -o '"dashboardId":"[^"]*"' | cut -d'"' -f4) + dashboard_id=$(echo "$response" | jq -r '.dashboardId // empty') if [[ -n "$dashboard_id" ]]; then echo "Dashboard created successfully with ID: $dashboard_id" >&2 echo "$dashboard_id" From 01ccb3f0a3efe4fc346c25b927331631ee3c0419 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 14 Jul 2025 04:07:27 -0700 Subject: [PATCH 10/13] capture response body and status separately --- resources/ingest_demo_data.sh | 49 +++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index b65245d46..8128281bf 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -72,37 +72,48 @@ curl_with_retry() { # Add URL curl_args+=("$url") - # Execute curl - local response - response=$(curl "${curl_args[@]}" 2>&1) + # Create temporary files for response body and stderr + local response_file + response_file=$(mktemp) || { log_error "Failed to create temporary file"; return 1; } + local stderr_file + stderr_file=$(mktemp) || { log_error "Failed to create temporary file"; rm -f "$response_file"; return 1; } + + # Add options to capture status code separately + curl_args+=("-w" "%{http_code}" "-o" "$response_file") + + # Execute curl and capture status code and stderr + local status_code + status_code=$(curl "${curl_args[@]}" 2>"$stderr_file") local curl_exit_code=$? # Check curl exit code if [[ $curl_exit_code -eq 0 ]]; then - local status_code - if [[ -n "$response" ]]; then - status_code=$(echo "$response" | tail -n1) - local response_body=$(echo "$response" | sed '$d') - - # Clean up temp file (only if we created it) - if [[ -n "$temp_file" && -z "$data_file" ]]; then - rm -f "$temp_file" - fi - - if [[ "$status_code" =~ ^2[0-9][0-9]$ ]]; then - echo "$response_body" - return 0 - else - return 1 - fi + local response_body + response_body=$(cat "$response_file" 2>/dev/null) + + # Clean up temporary files + rm -f "$response_file" "$stderr_file" + + # Clean up temp data file (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi + + if [[ "$status_code" =~ ^2[0-9][0-9]$ ]]; then + echo "$response_body" + return 0 else + log_error "HTTP $status_code: Request failed" return 1 fi elif [[ $curl_exit_code -eq 28 ]]; then # Timeout - retry + rm -f "$response_file" "$stderr_file" retry_count=$((retry_count + 1)) sleep 1 else + # Other curl error - cleanup and exit + rm -f "$response_file" "$stderr_file" break fi done From ccf49137d272c4dfaee776dfd9f25bacc6d68344 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 14 Jul 2025 04:32:18 -0700 Subject: [PATCH 11/13] add function for logging error --- resources/ingest_demo_data.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 8128281bf..99dc8e08a 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -12,6 +12,11 @@ BATCH_SIZE=1000 # Pre-compute auth header AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" +# Logging functions +log_error() { + echo "$@" >&2 +} + # Common curl function with retry logic curl_with_retry() { local url="$1" From b0b5b2c17cdd77639ae581f6e24f49519445909a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 14 Jul 2025 11:10:04 -0700 Subject: [PATCH 12/13] coderabbit suggestions --- resources/ingest_demo_data.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 99dc8e08a..229bec8ae 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -45,7 +45,6 @@ curl_with_retry() { local curl_args=( -s - -w '\n%{http_code}' --max-time "$max_time" --connect-timeout "$connect_timeout" -H "Content-Type: $content_type" From 898ce0de19ba722ce392af5186c53981b59c36d8 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 14 Jul 2025 11:24:57 -0700 Subject: [PATCH 13/13] strip newline to status --- resources/ingest_demo_data.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh index 229bec8ae..94216ad72 100755 --- a/resources/ingest_demo_data.sh +++ b/resources/ingest_demo_data.sh @@ -87,7 +87,7 @@ curl_with_retry() { # Execute curl and capture status code and stderr local status_code - status_code=$(curl "${curl_args[@]}" 2>"$stderr_file") + status_code=$(curl "${curl_args[@]}" 2>"$stderr_file" | tr -d '\n') local curl_exit_code=$? # Check curl exit code