diff --git a/Gemfile b/Gemfile index 784e23a..92128fa 100755 --- a/Gemfile +++ b/Gemfile @@ -2,6 +2,5 @@ ## The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl/ source "https://rubygems.org" - gemspec diff --git a/examples/apache.conf b/examples/apache.conf index 756e386..10c76c4 100644 --- a/examples/apache.conf +++ b/examples/apache.conf @@ -33,7 +33,7 @@ oci_la_global_metadata ${{: , :}} oci_la_entity_id # If same across sources. Else keep this in individual filters - oci_la_entity_type # If same across sources. Else keep this in individual filters + oci_la_entity_type # If same across sources. Else keep this in individual filters @@ -41,10 +41,11 @@ @type record_transformer enable_ruby true - oci_la_metadata ${{: , :}} + oci_la_metadata ${{: , : oci_la_log_group_id oci_la_log_path "${record['tailed_path']}" + oci_la_timezone tag ${tag} @@ -52,13 +53,12 @@ @type oci-logging-analytics namespace - # Auth config file details config_file_location ~/.oci/config profile_name DEFAULT # Buffer Configuration @type file - path /var/log + path /var/log retry_forever true disable_chunk_backup true diff --git a/fluent-plugin-oci-logging-analytics.gemspec b/fluent-plugin-oci-logging-analytics.gemspec index 47e3843..c65cb13 100755 --- a/fluent-plugin-oci-logging-analytics.gemspec +++ b/fluent-plugin-oci-logging-analytics.gemspec @@ -1,12 +1,13 @@ ## Copyright (c) 2021, 2022 Oracle and/or its affiliates. ## The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl/ +require_relative './lib/fluent/version/version' lib = File.expand_path("../lib", __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) Gem::Specification.new do |spec| spec.name = "fluent-plugin-oci-logging-analytics" - spec.version = "2.0.6" + spec.version = Version::VERSION spec.authors = ["Oracle","OCI Observability: Logging Analytics"] spec.email = ["oci_la_plugins_grp@oracle.com"] diff --git a/lib/fluent/dto/logEvents.rb b/lib/fluent/dto/logEvents.rb index 959f1b9..e59d1f6 100755 --- a/lib/fluent/dto/logEvents.rb +++ b/lib/fluent/dto/logEvents.rb @@ -2,9 +2,9 @@ ## The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl/ class LogEvents - attr_accessor :entityId, :entityType, :logSourceName, :logPath, :logRecords , :metadata + attr_accessor :entityId, :entityType, :logSourceName, :logPath, :logRecords , :metadata, :timezone def initialize(lrpe_key, fluentd_records) - @metadata, @entityId, @entityType, @logSourceName, @logPath = lrpe_key + @metadata, @entityId, @entityType, @logSourceName, @logPath, @timezone = lrpe_key @logRecords = fluentd_records.map{ |record| record['message'] } @@ -17,7 +17,8 @@ def to_hash entityType: @entityType, logSourceName: @logSourceName, logPath: @logPath, - logRecords: @logRecords + logRecords: @logRecords, + timezone:@timezone }.compact end diff --git a/lib/fluent/enums/source.rb b/lib/fluent/enums/source.rb new file mode 100644 index 0000000..776c2f5 --- /dev/null +++ b/lib/fluent/enums/source.rb @@ -0,0 +1,4 @@ +module Source + FLUENTD = :fluentd + KUBERNETES_SOLUTION = :kubernetes_solution +end diff --git a/lib/fluent/metrics/metricsLabels.rb b/lib/fluent/metrics/metricsLabels.rb index 4201a42..35099ec 100644 --- a/lib/fluent/metrics/metricsLabels.rb +++ b/lib/fluent/metrics/metricsLabels.rb @@ -1,5 +1,5 @@ class MetricsLabels - attr_accessor :worker_id, :tag, :logGroupId, :logSourceName, :logSet, :invalid_reason, :records_valid, :records_per_tag, :latency + attr_accessor :worker_id, :tag, :logGroupId, :logSourceName, :logSet, :invalid_reason, :records_valid, :records_per_tag, :latency,:timezone def initialize @worker_id = nil @tag = nil @@ -10,5 +10,6 @@ def initialize @records_valid = 0 @records_per_tag = 0 @latency = 0 + @timezone = nil end end \ No newline at end of file diff --git a/lib/fluent/plugin/out_oci-logging-analytics.rb b/lib/fluent/plugin/out_oci-logging-analytics.rb index 47b1a21..7b2e9bb 100755 --- a/lib/fluent/plugin/out_oci-logging-analytics.rb +++ b/lib/fluent/plugin/out_oci-logging-analytics.rb @@ -1,17 +1,18 @@ ## Copyright (c) 2021, 2022 Oracle and/or its affiliates. ## The Universal Permissive License (UPL), Version 1.0 as shown at https://oss.oracle.com/licenses/upl/ - require 'fluent/plugin/output' require "benchmark" require 'zip' require 'yajl' require 'yajl/json_gem' +# require 'tzinfo' require 'logger' require_relative '../dto/logEventsJson' require_relative '../dto/logEvents' require_relative '../metrics/prometheusMetrics' require_relative '../metrics/metricsLabels' +require_relative '../enums/source' # Import only specific OCI modules to improve load times and reduce the memory requirements. require 'oci/auth/auth' @@ -36,7 +37,6 @@ require 'oci/waiter' require 'oci/retry/retry' require 'oci/object_storage/object_storage' - module OCI class << self attr_accessor :sdk_name @@ -97,7 +97,8 @@ class OutOracleOCILogAnalytics < Output config_param :zip_file_location, :string, :default => nil desc 'The kubernetes_metadata_keys_mapping.' config_param :kubernetes_metadata_keys_mapping, :hash, :default => {"container_name":"Container","namespace_name":"Namespace","pod_name":"Pod","container_image":"Container Image Name","host":"Node"} - + desc 'opc-meta-properties' + config_param :collection_source, :string, :default => Source::FLUENTD #**************************************************************** desc 'The http proxy to be used.' @@ -256,6 +257,14 @@ def initialize_loganalytics_client() else @@loganalytics_client = OCI::LogAnalytics::LogAnalyticsClient.new(config: OCI::Config.new, signer: instance_principals_signer) end + when "WorkloadIdentity" + workload_identity_signer = OCI::Auth::Signers::oke_workload_resource_principal_signer + if is_valid(@endpoint) + @@loganalytics_client = OCI::LogAnalytics::LogAnalyticsClient.new(config: OCI::Config.new, endpoint: @endpoint, signer: workload_identity_signer) + @@logger.info {"loganalytics_client initialised with endpoint: #{@endpoint}"} + else + @@loganalytics_client = OCI::LogAnalytics::LogAnalyticsClient.new(config: OCI::Config.new, signer: workload_identity_signer) + end when "ConfigFile" my_config = OCI::ConfigFileLoader.load_config(config_file_location: @config_file_location, profile_name: @profile_name) if is_valid(@endpoint) @@ -628,6 +637,8 @@ def group_by_logGroupId(chunk) latency = 0 records_per_tag = 0 + + tag_metrics_set = Hash.new logGroup_labels_set = Hash.new @@ -637,8 +648,8 @@ def group_by_logGroupId(chunk) tags_per_logGroupId = Hash.new tag_logSet_map = Hash.new tag_metadata_map = Hash.new + timezoneValuesByTag = Hash.new incoming_records = 0 - chunk.each do |time, record| incoming_records += 1 metricsLabels = MetricsLabels.new @@ -722,6 +733,8 @@ def group_by_logGroupId(chunk) end next end + + # metricsLabels.timezone = record["oci_la_timezone"] metricsLabels.logGroupId = record["oci_la_log_group_id"] metricsLabels.logSourceName = record["oci_la_log_source_name"] if record["oci_la_log_set"] != nil @@ -770,6 +783,25 @@ def group_by_logGroupId(chunk) tags_per_logGroupId[record["oci_la_log_group_id"]] = record["tag"] end end + # validating the timezone field + if !timezoneValuesByTag.has_key?(record["tag"]) + begin + timezoneIdentifier = record["oci_la_timezone"] + unless is_valid(timezoneIdentifier) + record["oci_la_timezone"] = nil + else + isTimezoneExist = timezone_exist? timezoneIdentifier + unless isTimezoneExist + @@logger.warn { "Invalid timezone '#{timezoneIdentifier}', using default UTC." } + record["oci_la_timezone"] = "UTC" + end + + end + timezoneValuesByTag[record["tag"]] = record["oci_la_timezone"] + end + else + record["oci_la_timezone"] = timezoneValuesByTag[record["tag"]] + end records << record ensure @@ -916,6 +948,14 @@ def write(chunk) end end end + def timezone_exist?(tz) + begin + TZInfo::Timezone.get(tz) + return true + rescue TZInfo::InvalidTimezoneIdentifier + return false + end + end # Each oci_la_log_set will correspond to a separate file in the zip # Only MAX_FILES_PER_ZIP files are allowed per zip. @@ -958,6 +998,21 @@ def get_logSets_map_per_logGroupId(oci_la_log_group_id,records_per_logGroupId) # takes a fluentD chunk and converts it to an in-memory zipfile, populating metrics hash provided # Any exception raised is passed into the metrics hash, to be re-thrown from write() + def getCollectionSource(input) + collections_src = [] + if !is_valid input + collections_src.unshift("source:#{Source::FLUENTD}") + else + if input == Source::FLUENTD.to_s or input == Source::KUBERNETES_SOLUTION.to_s + collections_src.unshift("source:#{input}") + else + # source not define ! using default source 'fluentd' + collections_src.unshift("source:#{Source::FLUENTD}") + end + end + collections_src + end + def get_zipped_stream(oci_la_log_group_id,oci_la_global_metadata,records_per_logSet_map) begin current, = Time.now @@ -970,8 +1025,9 @@ def get_zipped_stream(oci_la_log_group_id,oci_la_global_metadata,records_per_log record['oci_la_metadata'], record['oci_la_entity_id'], record['oci_la_entity_type'], - record['oci_la_log_source_name'] , - record['oci_la_log_path'] + record['oci_la_log_source_name'], + record['oci_la_log_path'], + record['oci_la_timezone'] ]}.map { |lrpe_key, records_per_lrpe| number_of_records += records_per_lrpe.length LogEvents.new(lrpe_key, records_per_lrpe) @@ -1021,9 +1077,10 @@ def save_zip_to_local(oci_la_log_group_id, zippedstream, current_s) # upload zipped stream to oci def upload_to_oci(oci_la_log_group_id, number_of_records, zippedstream, metricsLabels_array) begin + collection_src_prop = getCollectionSource @collection_source error_reason = nil error_code = nil - opts = {payload_type: "ZIP"} + opts = { payload_type: "ZIP", opc_meta_properties:collection_src_prop} response = @@loganalytics_client.upload_log_events_file(namespace_name=@namespace, logGroupId=oci_la_log_group_id , diff --git a/lib/fluent/version/version.rb b/lib/fluent/version/version.rb new file mode 100644 index 0000000..be03b69 --- /dev/null +++ b/lib/fluent/version/version.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +module Version + VERSION = "2.0.7".freeze +end