diff --git a/ci/run.sh b/ci/run.sh index f30409d99..2bbfa3c45 100755 --- a/ci/run.sh +++ b/ci/run.sh @@ -131,6 +131,12 @@ else extra_tag_args="--tag secure_integration" fi + if [[ "$DISTRIBUTION" == "oss" ]]; then + extra_tag_args="$extra_tag_args --tag distribution:oss --tag ~distribution:xpack" + elif [[ "$DISTRIBUTION" == "default" ]]; then + extra_tag_args="$extra_tag_args --tag ~distribution:oss --tag distribution:xpack" + fi + case "$ES_VERSION" in LATEST-SNAPSHOT-*) split_latest=${ES_VERSION##*-} diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 69e5b1d66..97ee24024 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -90,6 +90,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base require "logstash/outputs/elasticsearch/http_client_builder" require "logstash/outputs/elasticsearch/common_configs" require "logstash/outputs/elasticsearch/common" + require "logstash/outputs/elasticsearch/ilm" # Protocol agnostic (i.e. non-http, non-java specific) configs go here include(LogStash::Outputs::ElasticSearch::CommonConfigs) @@ -97,6 +98,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # Protocol agnostic methods include(LogStash::Outputs::ElasticSearch::Common) + # Methods for ILM support + include(LogStash::Outputs::ElasticSearch::Ilm) + config_name "elasticsearch" # The Elasticsearch action to perform. Valid actions are: diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 073438610..294e46495 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -23,10 +23,10 @@ def register setup_hosts # properly sets @hosts build_client + setup_after_successful_connection check_action_validity @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) - install_template_after_successful_connection @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) end @@ -38,7 +38,7 @@ def multi_receive(events) retrying_submit(events.map {|e| event_action_tuple(e)}) end - def install_template_after_successful_connection + def setup_after_successful_connection @template_installer ||= Thread.new do sleep_interval = @retry_initial_interval until successful_connection? || @stopping.true? @@ -46,7 +46,11 @@ def install_template_after_successful_connection Stud.stoppable_sleep(sleep_interval) { @stopping.true? } sleep_interval = next_sleep_interval(sleep_interval) end - install_template if successful_connection? + if successful_connection? + verify_ilm_readiness if ilm_enabled? + install_template + setup_ilm if ilm_enabled? + end end end @@ -114,7 +118,6 @@ def maximum_seen_major_version client.maximum_seen_major_version end - def routing_field_name maximum_seen_major_version >= 6 ? :routing : :_routing end @@ -353,4 +356,4 @@ def dlq_enabled? !execution_context.dlq_writer.inner_writer.is_a?(::LogStash::Util::DummyDeadLetterQueueWriter) end end -end; end; end +end end end diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index 2d0800863..ddd067884 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -2,6 +2,10 @@ module LogStash; module Outputs; class ElasticSearch module CommonConfigs + + DEFAULT_INDEX_NAME = "logstash-%{+YYYY.MM.dd}" + DEFAULT_POLICY = "logstash-policy" + def self.included(mod) # The index to write events to. This can be dynamic using the `%{foo}` syntax. # The default value will partition your indices by day so you can more easily @@ -10,7 +14,7 @@ def self.included(mod) # For weekly indexes ISO 8601 format is recommended, eg. logstash-%{+xxxx.ww}. # LS uses Joda to format the index pattern from event timestamp. # Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here]. - mod.config :index, :validate => :string, :default => "logstash-%{+YYYY.MM.dd}" + mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME mod.config :document_type, :validate => :string, @@ -136,6 +140,24 @@ def self.included(mod) # Set which ingest pipeline you wish to execute for an event. You can also use event dependent configuration # here like `pipeline => "%{INGEST_PIPELINE}"` mod.config :pipeline, :validate => :string, :default => nil + + + # ----- + # ILM configurations (beta) + # ----- + # Flag for enabling Index Lifecycle Management integration. + mod.config :ilm_enabled, :validate => :boolean, :default => false + + # Rollover alias used for indexing data. If rollover alias doesn't exist, Logstash will create it and map it to the relevant index + mod.config :ilm_rollover_alias, :validate => :string, :default => 'logstash' + + # appends “{now/d}-000001” by default for new index creation, subsequent rollover indices will increment based on this pattern i.e. “000002” + # {now/d} is date math, and will insert the appropriate value automatically. + mod.config :ilm_pattern, :validate => :string, :default => '{now/d}-000001' + + # ILM policy to use, if undefined the default policy will be used. + mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY + end end end end end diff --git a/lib/logstash/outputs/elasticsearch/default-ilm-policy.json b/lib/logstash/outputs/elasticsearch/default-ilm-policy.json new file mode 100644 index 000000000..f2ac4da06 --- /dev/null +++ b/lib/logstash/outputs/elasticsearch/default-ilm-policy.json @@ -0,0 +1,14 @@ +{ + "policy" : { + "phases": { + "hot" : { + "actions" : { + "rollover" : { + "max_size" : "50gb", + "max_age":"30d" + } + } + } + } + } +} \ No newline at end of file diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index b46f4be0a..32a37e82a 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -337,17 +337,62 @@ def host_to_url(h) ::LogStash::Util::SafeURI.new(raw_url) end - def template_exists?(name) - response = @pool.head("/_template/#{name}") + def exists?(path, use_get=false) + response = use_get ? @pool.get(path) : @pool.head(path) response.code >= 200 && response.code <= 299 end + def template_exists?(name) + exists?("/_template/#{name}") + end + def template_put(name, template) path = "_template/#{name}" logger.info("Installing elasticsearch template to #{path}") @pool.put(path, nil, LogStash::Json.dump(template)) end + # ILM methods + + # check whether rollover alias already exists + def rollover_alias_exists?(name) + exists?(name) + end + + # Create a new rollover alias + def rollover_alias_put(alias_name, alias_definition) + logger.info("Creating rollover alias #{alias_name}") + begin + @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + # If the rollover alias already exists, ignore the error that comes back from Elasticsearch + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + if e.response_code == 400 + logger.info("Rollover Alias #{alias_name} already exists. Skipping") + return + end + raise e + end + end + + def get_xpack_info + get("/_xpack") + end + + def get_ilm_endpoint + @pool.get("/_ilm/policy") + end + + def ilm_policy_exists?(name) + exists?("/_ilm/policy/#{name}", true) + end + + def ilm_policy_put(name, policy) + path = "_ilm/policy/#{name}" + logger.info("Installing ILM policy #{policy} to #{path}") + @pool.put(path, nil, LogStash::Json.dump(policy)) + end + + # Build a bulk item for an elasticsearch update action def update_action_builder(args, source) if args[:_script] diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index 6e12db126..96e8ae671 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -103,9 +103,9 @@ def format_url(url, path_and_query=nil) end request_uri.query = new_query_parts.join("&") unless new_query_parts.empty? - - request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.path}".gsub(/\/{2,}/, "/") - + + request_uri.path = "#{request_uri.path}/#{parsed_path_and_query.raw_path}".gsub(/\/{2,}/, "/") + request_uri end diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb new file mode 100644 index 000000000..80876265b --- /dev/null +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -0,0 +1,84 @@ +module LogStash; module Outputs; class ElasticSearch + module Ilm + + ILM_POLICY_PATH = "default-ilm-policy.json" + + def setup_ilm + return unless ilm_enabled? + @logger.info("Using Index lifecycle management - this feature is currently in beta.") + @logger.warn "Overwriting supplied index name with rollover alias #{@ilm_rollover_alias}" if @index != LogStash::Outputs::ElasticSearch::CommonConfigs::DEFAULT_INDEX_NAME + @index = ilm_rollover_alias + + maybe_create_rollover_alias + maybe_create_ilm_policy + end + + def ilm_enabled? + @ilm_enabled + end + + def verify_ilm_readiness + return unless ilm_enabled? + + # Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature + # available and enabled. + begin + xpack = client.get_xpack_info + features = xpack["features"] + ilm = features.nil? ? nil : features["ilm"] + raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster" if features.nil? || ilm.nil? + raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not available in your Elasticsearch cluster" unless ilm['available'] + raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not enabled in your Elasticsearch cluster" unless ilm['enabled'] + + unless ilm_policy_default? || client.ilm_policy_exists?(ilm_policy) + raise LogStash::ConfigurationError, "The specified ILM policy #{ilm_policy} does not exist on your Elasticsearch instance" + end + + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + # Check xpack endpoint: If no xpack endpoint, then this version of Elasticsearch is not compatible + if e.response_code == 404 + raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster" + elsif e.response_code == 400 + raise LogStash::ConfigurationError, "Index Lifecycle management is enabled in logstash, but not installed on your Elasticsearch cluster" + else + raise e + end + end + end + + private + + def ilm_policy_default? + ilm_policy == LogStash::Outputs::ElasticSearch::DEFAULT_POLICY + end + + def maybe_create_ilm_policy + if ilm_policy_default? && !client.ilm_policy_exists?(ilm_policy) + client.ilm_policy_put(ilm_policy, policy_payload) + end + end + + def maybe_create_rollover_alias + client.rollover_alias_put(rollover_alias_target, rollover_alias_payload) unless client.rollover_alias_exists?(ilm_rollover_alias) + end + + def rollover_alias_target + "<#{ilm_rollover_alias}-#{ilm_pattern}>" + end + + def rollover_alias_payload + { + 'aliases' => { + ilm_rollover_alias =>{ + 'is_write_index' => true + } + } + } + end + + def policy_payload + policy_path = ::File.expand_path(ILM_POLICY_PATH, ::File.dirname(__FILE__)) + LogStash::Json.load(::IO.read(policy_path)) + end + end + end end end \ No newline at end of file diff --git a/lib/logstash/outputs/elasticsearch/template_manager.rb b/lib/logstash/outputs/elasticsearch/template_manager.rb index 47a74d328..66fad1ddf 100644 --- a/lib/logstash/outputs/elasticsearch/template_manager.rb +++ b/lib/logstash/outputs/elasticsearch/template_manager.rb @@ -5,8 +5,9 @@ def self.install_template(plugin) return unless plugin.manage_template plugin.logger.info("Using mapping template from", :path => plugin.template) template = get_template(plugin.template, plugin.maximum_seen_major_version) + add_ilm_settings_to_template(plugin, template) if plugin.ilm_enabled? plugin.logger.info("Attempting to install template", :manage_template => template) - install(plugin.client, plugin.template_name, template, plugin.template_overwrite) + install(plugin.client, template_name(plugin), template, plugin.template_overwrite) rescue => e plugin.logger.error("Failed to install template.", :message => e.message, :class => e.class.name, :backtrace => e.backtrace) end @@ -21,6 +22,25 @@ def self.install(client, template_name, template, template_overwrite) client.template_install(template_name, template, template_overwrite) end + def self.add_ilm_settings_to_template(plugin, template) + plugin.logger.info("Overwriting index patterns, as ILM is enabled.") + # Overwrite any index patterns, and use the rollover alias. Use 'index_patterns' rather than 'template' for pattern + # definition - remove any existing definition of 'template' + template.delete('template') if template.include?('template') + template['index_patterns'] = "#{plugin.ilm_rollover_alias}-*" + if template['settings'] && (template['settings']['index.lifecycle.name'] || template['settings']['index.lifecycle.rollover_alias']) + plugin.logger.info("Overwriting index lifecycle name and rollover alias as ILM is enabled.") + end + template['settings'].update({ 'index.lifecycle.name' => plugin.ilm_policy, 'index.lifecycle.rollover_alias' => plugin.ilm_rollover_alias}) + end + + # Template name - if template_name set, use it + # if not and ILM is enabled, use the rollover alias + # else use the default value of template_name + def self.template_name(plugin) + plugin.ilm_enabled? && !plugin.original_params.key?('template_name') ? plugin.ilm_rollover_alias : plugin.template_name + end + def self.default_template_path(es_major_version) template_version = es_major_version == 1 ? 2 : es_major_version default_template_name = "elasticsearch-template-es#{template_version}x.json" diff --git a/spec/es_spec_helper.rb b/spec/es_spec_helper.rb index 4960d254a..6e2ea466d 100644 --- a/spec/es_spec_helper.rb +++ b/spec/es_spec_helper.rb @@ -1,6 +1,13 @@ require "logstash/devutils/rspec/spec_helper" require 'manticore' require 'elasticsearch' +require_relative "support/elasticsearch/api/actions/delete_ilm_policy" +require_relative "support/elasticsearch/api/actions/get_alias" +require_relative "support/elasticsearch/api/actions/put_alias" +require_relative "support/elasticsearch/api/actions/get_ilm_policy" +require_relative "support/elasticsearch/api/actions/put_ilm_policy" + +require 'json' module ESHelper def get_host_port @@ -19,6 +26,10 @@ def doc_type end end + def todays_date + Time.now.strftime("%Y.%m.%d") + end + def mapping_name if ESHelper.es_version_satisfies?(">=7") "_doc" @@ -41,7 +52,6 @@ def self.es_version end RSpec::Matchers.define :have_hits do |expected| - es_version = RSpec.configuration.filter[:es_version] || ENV['ES_VERSION'] match do |actual| if ESHelper.es_version_satisfies?(">=7") expected == actual['hits']['total']['value'] @@ -61,6 +71,54 @@ def self.es_version_satisfies?(*requirement) es_release_version = Gem::Version.new(es_version).release Gem::Requirement.new(requirement).satisfied_by?(es_release_version) end + + def clean(client) + client.indices.delete_template(:name => "*") + # This can fail if there are no indexes, ignore failure. + client.indices.delete(:index => "*") rescue nil + clean_ilm(client) if supports_ilm?(client) + end + + def set_cluster_settings(client, cluster_settings) + client.cluster.put_settings(body: cluster_settings) + get_cluster_settings(client) + end + + def get_cluster_settings(client) + client.cluster.get_settings + end + + def get_policy(client, policy_name) + client.get_ilm_policy(name: policy_name) + end + + def put_policy(client, policy_name, policy) + client.put_ilm_policy({:name => policy_name, :body=> policy}) + end + + def put_alias(client, the_alias, index) + body = { + "aliases" => { + index => { + "is_write_index"=> true + } + } + } + client.put_alias({name: the_alias, body: body}) + end + + def clean_ilm(client) + client.get_ilm_policy.each_key {|key| client.delete_ilm_policy(name: key)} + end + + def supports_ilm?(client) + begin + client.get_ilm_policy + true + rescue + false + end + end end RSpec.configure do |config| diff --git a/spec/fixtures/template-with-policy-es6x.json b/spec/fixtures/template-with-policy-es6x.json new file mode 100644 index 000000000..c6b89bec8 --- /dev/null +++ b/spec/fixtures/template-with-policy-es6x.json @@ -0,0 +1,48 @@ +{ + "template" : "overwrite-*", + "version" : 60001, + "settings" : { + "index.refresh_interval" : "1s", + "number_of_shards": 1, + "index.lifecycle.name": "overwrite-policy", + "index.lifecycle.rollover_alias": "overwrite" + }, + "mappings" : { + "_default_" : { + "dynamic_templates" : [ { + "message_field" : { + "path_match" : "message", + "match_mapping_type" : "string", + "mapping" : { + "type" : "text", + "norms" : false + } + } + }, { + "string_fields" : { + "match" : "*", + "match_mapping_type" : "string", + "mapping" : { + "type" : "text", "norms" : false, + "fields" : { + "keyword" : { "type": "keyword", "ignore_above": 256 } + } + } + } + } ], + "properties" : { + "@timestamp": { "type": "date"}, + "@version": { "type": "keyword"}, + "geoip" : { + "dynamic": true, + "properties" : { + "ip": { "type": "ip" }, + "location" : { "type" : "geo_point" }, + "latitude" : { "type" : "half_float" }, + "longitude" : { "type" : "half_float" } + } + } + } + } + } +} diff --git a/spec/fixtures/template-with-policy-es7x.json b/spec/fixtures/template-with-policy-es7x.json new file mode 100644 index 000000000..175d9b877 --- /dev/null +++ b/spec/fixtures/template-with-policy-es7x.json @@ -0,0 +1,46 @@ +{ + "index_patterns" : "overwrite-*", + "version" : 60001, + "settings" : { + "index.refresh_interval" : "1s", + "number_of_shards": 1 + }, + "mappings" : { + "_doc" : { + "dynamic_templates" : [ { + "message_field" : { + "path_match" : "message", + "match_mapping_type" : "string", + "mapping" : { + "type" : "text", + "norms" : false + } + } + }, { + "string_fields" : { + "match" : "*", + "match_mapping_type" : "string", + "mapping" : { + "type" : "text", "norms" : false, + "fields" : { + "keyword" : { "type": "keyword", "ignore_above": 256 } + } + } + } + } ], + "properties" : { + "@timestamp": { "type": "date"}, + "@version": { "type": "keyword"}, + "geoip" : { + "dynamic": true, + "properties" : { + "ip": { "type": "ip" }, + "location" : { "type" : "geo_point" }, + "latitude" : { "type" : "half_float" }, + "longitude" : { "type" : "half_float" } + } + } + } + } + } +} diff --git a/spec/integration/outputs/ilm_spec.rb b/spec/integration/outputs/ilm_spec.rb new file mode 100644 index 000000000..fcbe794fa --- /dev/null +++ b/spec/integration/outputs/ilm_spec.rb @@ -0,0 +1,542 @@ +require_relative "../../../spec/es_spec_helper" + +shared_examples_for 'an Elasticsearch instance that does not support index lifecycle management' do + require "logstash/outputs/elasticsearch" + + let (:ilm_enabled) { false } + let (:settings) { + { + "ilm_enabled" => ilm_enabled, + "hosts" => "#{get_host_port()}" + } + } + + before :each do + require "elasticsearch" + + # Clean ES of data before we start. + @es = get_client + clean(@es) + end + + after :each do + clean(@es) + end + + subject { LogStash::Outputs::ElasticSearch.new(settings) } + + context 'when ilm is enabled in Logstash' do + let (:ilm_enabled) { true } + + it 'should raise a configuration error' do + expect do + begin + subject.register + sleep(1) + ensure + subject.stop_template_installer + end + end.to raise_error(LogStash::ConfigurationError) + end + end + + context 'when ilm is disabled in Logstash' do + it 'should index documents normally' do + subject.register + + subject.multi_receive([ + LogStash::Event.new("message" => "sample message here"), + LogStash::Event.new("somemessage" => { "message" => "sample nested message here" }), + LogStash::Event.new("somevalue" => 100), + ]) + + sleep(6) + + subject.multi_receive([ + LogStash::Event.new("country" => "us"), + LogStash::Event.new("country" => "at"), + LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }) + ]) + + @es.indices.refresh + + # Wait or fail until everything's indexed. + Stud::try(20.times) do + r = @es.search + expect(r).to have_hits(6) + end + indexes_written = @es.search['hits']['hits'].each_with_object(Hash.new(0)) do |x, res| + index_written = x['_index'] + res[index_written] += 1 + end + expect(indexes_written.count).to eq(1) + end + end + +end + +shared_examples_for 'an ILM enabled Logstash' do + + context 'with a policy with a maximum number of documents' do + let (:policy) { small_max_doc_policy } + let (:ilm_policy_name) { "custom-policy"} + let (:settings) { super.merge("ilm_policy" => ilm_policy_name)} + + it 'should rollover when the policy max docs is reached' do + put_policy(@es,ilm_policy_name, policy) + subject.register + + subject.multi_receive([ + LogStash::Event.new("message" => "sample message here"), + LogStash::Event.new("somemessage" => { "message" => "sample nested message here" }), + LogStash::Event.new("somevalue" => 100), + ]) + + sleep(6) + + subject.multi_receive([ + LogStash::Event.new("country" => "us"), + LogStash::Event.new("country" => "at"), + LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }) + ]) + + sleep(6) + + subject.multi_receive([ + LogStash::Event.new("country" => "uk"), + LogStash::Event.new("country" => "fr"), + LogStash::Event.new("geoip" => { "location" => [ 0.1, 1.0 ] }) + ]) + + @es.indices.refresh + + # Wait or fail until everything's indexed. + Stud::try(20.times) do + r = @es.search + expect(r).to have_hits(9) + end + indexes_written = @es.search['hits']['hits'].each_with_object(Hash.new(0)) do |x, res| + index_written = x['_index'] + res[index_written] += 1 + end + expect(indexes_written.count).to eq(3) + expect(indexes_written["#{expected_index}-#{todays_date}-000001"]).to eq(3) + expect(indexes_written["#{expected_index}-#{todays_date}-000002"]).to eq(3) + expect(indexes_written["#{expected_index}-#{todays_date}-000003"]).to eq(3) + end + end + + context 'with a policy where the maximum number of documents is not reached' do + let (:policy) { large_max_doc_policy } + let (:ilm_policy_name) { "custom-policy"} + let (:settings) { super.merge("ilm_policy" => ilm_policy_name)} + + it 'should ingest into a single index when max docs is not reached' do + put_policy(@es,ilm_policy_name, policy) + subject.register + + subject.multi_receive([ + LogStash::Event.new("message" => "sample message here"), + LogStash::Event.new("somemessage" => { "message" => "sample nested message here" }), + LogStash::Event.new("somevalue" => 100), + ]) + + sleep(6) + + subject.multi_receive([ + LogStash::Event.new("country" => "us"), + LogStash::Event.new("country" => "at"), + LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }) + ]) + + @es.indices.refresh + + # Wait or fail until everything's indexed. + Stud::try(20.times) do + r = @es.search + expect(r).to have_hits(6) + end + indexes_written = @es.search['hits']['hits'].each_with_object(Hash.new(0)) do |x, res| + index_written = x['_index'] + res[index_written] += 1 + end + expect(indexes_written.count).to eq(1) + expect(indexes_written["#{expected_index}-#{todays_date}-000001"]).to eq(6) + end + end +end + + +if ESHelper.es_version_satisfies?("<= 6.5") + describe 'Pre-ILM versions of Elasticsearch', :integration => true do + it_behaves_like 'an Elasticsearch instance that does not support index lifecycle management' + end +end + +if ESHelper.es_version_satisfies?(">= 6.6") + describe 'OSS Elasticsearch', :distribution => 'oss', :integration => true do + it_behaves_like 'an Elasticsearch instance that does not support index lifecycle management' + end + + describe 'Elasticsearch has index lifecycle management enabled', :distribution => 'xpack', :integration => true do + DEFAULT_INTERVAL = '600s' + + require "logstash/outputs/elasticsearch" + let (:ilm_enabled) { true } + + let (:settings) { + { + "ilm_enabled" => ilm_enabled, + "hosts" => "#{get_host_port()}" + } + } + let (:policy) { small_max_doc_policy } + + + let (:small_max_doc_policy) { + {"policy" => { + "phases"=> { + "hot" => { + "actions" => { + "rollover" => { + "max_docs" => "3" + } + } + } + } + }} + } + + let (:large_max_doc_policy) { + {"policy" => { + "phases"=> { + "hot" => { + "actions" => { + "rollover" => { + "max_docs" => "1000000" + } + } + } + } + }} + } + + + + subject { LogStash::Outputs::ElasticSearch.new(settings) } + + before :each do + # Delete all templates first. + require "elasticsearch" + + # Clean ES of data before we start. + @es = get_client + clean(@es) + # Set the poll interval for lifecycle management to be short so changes get picked up in time. + set_cluster_settings(@es, { + "persistent" => { + "indices.lifecycle.poll_interval" => "1s" + } + }) + end + + after :each do + # Set poll interval back to default + set_cluster_settings(@es, { + "persistent" => { + "indices.lifecycle.poll_interval" => DEFAULT_INTERVAL + } + }) + clean(@es) + end + + + context 'with ilm enabled' do + let (:ilm_enabled) { true } + + + context 'when using the default policy' do + context 'with a custom pattern' do + let (:settings) { super.merge("ilm_pattern" => "000001")} + it 'should create a rollover alias' do + expect(@es.indices.exists_alias(index: "logstash")).to be_falsey + subject.register + sleep(1) + expect(@es.indices.exists_alias(index: "logstash")).to be_truthy + expect(@es.get_alias(name: "logstash")).to include("logstash-000001") + end + end + + + it 'should install it if it is not present' do + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + subject.register + sleep(1) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.not_to raise_error + end + + it 'should create the default rollover alias' do + expect(@es.indices.exists_alias(index: "logstash")).to be_falsey + subject.register + sleep(1) + expect(@es.indices.exists_alias(index: "logstash")).to be_truthy + expect(@es.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001") + end + + + it 'should ingest into a single index' do + subject.register + + subject.multi_receive([ + LogStash::Event.new("message" => "sample message here"), + LogStash::Event.new("somemessage" => { "message" => "sample nested message here" }), + LogStash::Event.new("somevalue" => 100), + ]) + + sleep(6) + + subject.multi_receive([ + LogStash::Event.new("country" => "us"), + LogStash::Event.new("country" => "at"), + LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }) + ]) + + @es.indices.refresh + + # Wait or fail until everything's indexed. + Stud::try(20.times) do + r = @es.search + expect(r).to have_hits(6) + end + indexes_written = @es.search['hits']['hits'].each_with_object(Hash.new(0)) do |x, res| + index_written = x['_index'] + res[index_written] += 1 + end + + expect(indexes_written.count).to eq(1) + expect(indexes_written["logstash-#{todays_date}-000001"]).to eq(6) + end + end + + context 'when not using the default policy' do + let (:ilm_policy_name) {"new_one"} + let (:settings) { super.merge("ilm_policy" => ilm_policy_name)} + let (:policy) {{ + "policy" => { + "phases"=> { + "hot" => { + "actions" => { + "rollover" => { + "max_docs" => "3" + } + } + } + } + }}} + + before do + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + put_policy(@es,ilm_policy_name, policy) + end + + it 'should not install the default policy if it is not used' do + subject.register + sleep(1) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + end + end + + context 'when using a time based policy' do + let (:ilm_policy_name) {"new_one"} + let (:settings) { super.merge("ilm_policy" => ilm_policy_name)} + let (:policy) {{ + "policy" => { + "phases"=> { + "hot" => { + "actions" => { + "rollover" => { + "max_age" => "1d" + } + } + } + } + }}} + + before do + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + put_policy(@es,ilm_policy_name, policy) + end + + it 'should not install the default policy if it is not used' do + subject.register + sleep(1) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + end + end + context 'with the default template' do + let(:expected_index) { "logstash" } + + it 'should create the rollover alias' do + expect(@es.indices.exists_alias(index: expected_index)).to be_falsey + subject.register + sleep(1) + expect(@es.indices.exists_alias(index: expected_index)).to be_truthy + expect(@es.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001") + end + + it 'should write the ILM settings into the template' do + subject.register + sleep(1) + expect(@es.indices.get_template(name: "logstash")["logstash"]["index_patterns"]).to eq(["logstash-*"]) + expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']['name']).to eq("logstash-policy") + expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']['rollover_alias']).to eq("logstash") + end + + it_behaves_like 'an ILM enabled Logstash' + end + + context 'with a custom template' do + let (:ilm_rollover_alias) { "the_cat_in_the_hat" } + let (:index) { ilm_rollover_alias } + let(:expected_index) { index } + let (:settings) { super.merge("ilm_policy" => ilm_policy_name, + "template" => template, + "ilm_rollover_alias" => ilm_rollover_alias)} + + + if ESHelper.es_version_satisfies?(">= 7.0") + let (:template) { "spec/fixtures/template-with-policy-es7x.json" } + else + let (:template) { "spec/fixtures/template-with-policy-es6x.json" } + end + let (:ilm_enabled) { true } + let (:ilm_policy_name) { "custom-policy" } + + + before :each do + put_policy(@es,ilm_policy_name, policy) + end + + it_behaves_like 'an ILM enabled Logstash' + + it 'should create the rollover alias' do + expect(@es.indices.exists_alias(index: ilm_rollover_alias)).to be_falsey + subject.register + sleep(1) + expect(@es.indices.exists_alias(index: ilm_rollover_alias)).to be_truthy + expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001") + end + + context 'when the custom rollover alias already exists' do + it 'should ignore the already exists error' do + expect(@es.indices.exists_alias(index: ilm_rollover_alias)).to be_falsey + put_alias(@es, "#{ilm_rollover_alias}-#{todays_date}-000001", ilm_rollover_alias) + expect(@es.indices.exists_alias(index: ilm_rollover_alias)).to be_truthy + subject.register + sleep(1) + expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001") + end + + end + + it 'should write the ILM settings into the template' do + subject.register + sleep(1) + expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["index_patterns"]).to eq(["#{ilm_rollover_alias}-*"]) + expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["settings"]['index']['lifecycle']['name']).to eq(ilm_policy_name) + expect(@es.indices.get_template(name: ilm_rollover_alias)[ilm_rollover_alias]["settings"]['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias) + end + + context 'with a different template_name' do + let (:template_name) { "custom_template_name" } + let (:settings) { super.merge('template_name' => template_name)} + + it_behaves_like 'an ILM enabled Logstash' + + it 'should write the ILM settings into the template' do + subject.register + sleep(1) + expect(@es.indices.get_template(name: template_name)[template_name]["index_patterns"]).to eq(["#{ilm_rollover_alias}-*"]) + expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']['name']).to eq(ilm_policy_name) + expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']['rollover_alias']).to eq(ilm_rollover_alias) + end + end + + end + end + + context 'with ilm disabled' do + let (:ilm_enabled) { false } + + it 'should not create a rollover alias' do + expect(@es.get_alias).to be_empty + subject.register + sleep(1) + expect(@es.get_alias).to be_empty + end + + it 'should not install the default policy' do + subject.register + sleep(1) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + end + + it 'should not write the ILM settings into the template' do + subject.register + sleep(1) + expect(@es.indices.get_template(name: "logstash")["logstash"]["index_patterns"]).to eq(["logstash-*"]) + expect(@es.indices.get_template(name: "logstash")["logstash"]["settings"]['index']['lifecycle']).to be_nil + end + + context 'with an existing policy that will roll over' do + let (:policy) { small_max_doc_policy } + let (:ilm_policy_name) { "3_docs"} + let (:settings) { super.merge("ilm_policy" => ilm_policy_name)} + + it 'should not roll over indices' do + subject.register + subject.multi_receive([ + LogStash::Event.new("message" => "sample message here"), + LogStash::Event.new("somemessage" => { "message" => "sample nested message here" }), + LogStash::Event.new("somevalue" => 100), + ]) + + sleep(6) + + subject.multi_receive([ + LogStash::Event.new("country" => "us"), + LogStash::Event.new("country" => "at"), + LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }) + ]) + + @es.indices.refresh + + # Wait or fail until everything's indexed. + Stud::try(20.times) do + r = @es.search + expect(r).to have_hits(6) + end + indexes_written = @es.search['hits']['hits'].each_with_object(Hash.new(0)) do |x, res| + index_written = x['_index'] + res[index_written] += 1 + end + expect(indexes_written.count).to eq(1) + expect(indexes_written.values.first).to eq(6) + end + end + + context 'with a custom template name' do + let (:template_name) { "custom_template_name" } + let (:settings) { super.merge('template_name' => template_name)} + + it 'should not write the ILM settings into the template' do + subject.register + sleep(1) + expect(@es.indices.get_template(name: template_name)[template_name]["index_patterns"]).to eq(["logstash-*"]) + expect(@es.indices.get_template(name: template_name)[template_name]["settings"]['index']['lifecycle']).to be_nil + end + end + + end + end +end \ No newline at end of file diff --git a/spec/support/elasticsearch/api/actions/delete_ilm_policy.rb b/spec/support/elasticsearch/api/actions/delete_ilm_policy.rb new file mode 100644 index 000000000..83c66f3c7 --- /dev/null +++ b/spec/support/elasticsearch/api/actions/delete_ilm_policy.rb @@ -0,0 +1,19 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +module Elasticsearch + module API + module Actions + + # Update the password of the specified user + def delete_ilm_policy(arguments={}) + method = HTTP_DELETE + path = Utils.__pathify '_ilm/policy/', + Utils.__escape(arguments[:name]) + params = {} + perform_request(method, path, params, nil).body + end + end + end +end diff --git a/spec/support/elasticsearch/api/actions/get_alias.rb b/spec/support/elasticsearch/api/actions/get_alias.rb new file mode 100644 index 000000000..ef4ebbd4f --- /dev/null +++ b/spec/support/elasticsearch/api/actions/get_alias.rb @@ -0,0 +1,18 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +module Elasticsearch + module API + module Actions + + # Retrieve the list of index lifecycle management policies + def get_alias(arguments={}) + method = HTTP_GET + path = Utils.__pathify '_alias', Utils.__escape(arguments[:name]) + params = {} + perform_request(method, path, params, nil).body + end + end + end +end \ No newline at end of file diff --git a/spec/support/elasticsearch/api/actions/get_ilm_policy.rb b/spec/support/elasticsearch/api/actions/get_ilm_policy.rb new file mode 100644 index 000000000..accf98466 --- /dev/null +++ b/spec/support/elasticsearch/api/actions/get_ilm_policy.rb @@ -0,0 +1,18 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +module Elasticsearch + module API + module Actions + + # Retrieve the list of index lifecycle management policies + def get_ilm_policy(arguments={}) + method = HTTP_GET + path = Utils.__pathify '_ilm/policy', Utils.__escape(arguments[:name]) + params = {} + perform_request(method, path, params, nil).body + end + end + end +end \ No newline at end of file diff --git a/spec/support/elasticsearch/api/actions/put_alias.rb b/spec/support/elasticsearch/api/actions/put_alias.rb new file mode 100644 index 000000000..d0585934f --- /dev/null +++ b/spec/support/elasticsearch/api/actions/put_alias.rb @@ -0,0 +1,24 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +module Elasticsearch + module API + module Actions + + # @option arguments [String] :name The name of the alias (*Required*) + # @option arguments [Hash] :The alias definition(*Required*) + + def put_alias(arguments={}) + raise ArgumentError, "Required argument 'name' missing" unless arguments[:name] + raise ArgumentError, "Required argument 'body' missing" unless arguments[:body] + method = HTTP_PUT + path = Utils.__pathify Utils.__escape(arguments[:name]) + + params = Utils.__validate_and_extract_params arguments + body = arguments[:body] + perform_request(method, path, params, body.to_json).body + end + end + end +end diff --git a/spec/support/elasticsearch/api/actions/put_ilm_policy.rb b/spec/support/elasticsearch/api/actions/put_ilm_policy.rb new file mode 100644 index 000000000..e670366b8 --- /dev/null +++ b/spec/support/elasticsearch/api/actions/put_ilm_policy.rb @@ -0,0 +1,25 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +module Elasticsearch + module API + module Actions + + # @option arguments [String] :name The name of the policy (*Required*) + # @option arguments [Hash] :body The policy definition (*Required*) + + def put_ilm_policy(arguments={}) + raise ArgumentError, "Required argument 'name' missing" unless arguments[:name] + raise ArgumentError, "Required argument 'body' missing" unless arguments[:body] + method = HTTP_PUT + path = Utils.__pathify '_ilm/policy/', Utils.__escape(arguments[:name]) + + params = Utils.__validate_and_extract_params arguments + + body = arguments[:body] + perform_request(method, path, params, body.to_json).body + end + end + end +end