From b295563014fb44d63f0dae80833b88f72ed7d7e5 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 29 Jul 2019 16:41:35 +0200 Subject: [PATCH 1/2] [CLIENT] Accept options passed to #perform_request to avoid infinite retry loop --- .../elasticsearch/transport/transport/base.rb | 20 +- .../transport/transport/http/curb.rb | 4 +- .../transport/transport/http/faraday.rb | 2 +- .../transport/transport/http/manticore.rb | 2 +- .../transport/transport/sniffer.rb | 3 +- .../spec/elasticsearch/transport/base_spec.rb | 180 ++++++++++++++++++ 6 files changed, 199 insertions(+), 12 deletions(-) diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb index bd71c32879..58096e3b48 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb @@ -16,7 +16,7 @@ module Base attr_reader :hosts, :options, :connections, :counter, :last_request_at, :protocol attr_accessor :serializer, :sniffer, :logger, :tracer, :reload_connections, :reload_after, - :resurrect_after, :max_retries + :resurrect_after # Creates a new transport object # @@ -52,7 +52,6 @@ def initialize(arguments={}, &block) @reload_connections = options[:reload_connections] @reload_after = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER - @max_retries = options[:retry_on_failure].is_a?(Integer) ? options[:retry_on_failure] : DEFAULT_MAX_RETRIES @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i } end @@ -244,10 +243,17 @@ def __full_url(host) # @raise [ServerError] If request failed on server # @raise [Error] If no connection is available # - def perform_request(method, path, params={}, body=nil, headers=nil, &block) + def perform_request(method, path, params={}, body=nil, headers=nil, opts={}, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? start = Time.now if logger || tracer tries = 0 + reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure]) + + max_retries = if opts.key?(:retry_on_failure) + opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure] + elsif options.key?(:retry_on_failure) + options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure] + end params = params.clone @@ -271,9 +277,9 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) rescue Elasticsearch::Transport::Transport::ServerError => e - if @retry_on_status.include?(response.status) + if response && @retry_on_status.include?(response.status) logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger - if tries <= max_retries + if tries <= (max_retries || DEFAULT_MAX_RETRIES) retry else logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger @@ -288,12 +294,12 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) connection.dead! - if @options[:reload_on_failure] and tries < connections.all.size + if reload_on_failure and tries < connections.all.size logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger reload_connections! and retry end - if @options[:retry_on_failure] + if max_retries logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger if tries <= max_retries retry diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb index ed51b3424a..ce8821d1bf 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb @@ -15,8 +15,8 @@ class Curb # @return [Response] # @see Transport::Base#perform_request # - def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection,url| + def perform_request(method, path, params={}, body=nil, headers=nil, opts={}) + super do |connection, url| connection.connection.url = url case method diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb index dbd377b9aa..c74948eeed 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb @@ -16,7 +16,7 @@ class Faraday # @return [Response] # @see Transport::Base#perform_request # - def perform_request(method, path, params={}, body=nil, headers=nil) + def perform_request(method, path, params={}, body=nil, headers=nil, opts={}) super do |connection, url| headers = headers || connection.connection.headers diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb index 666aed9164..b398797076 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb @@ -63,7 +63,7 @@ def build_client(options={}) # @return [Response] # @see Transport::Base#perform_request # - def perform_request(method, path, params={}, body=nil, headers=nil) + def perform_request(method, path, params={}, body=nil, headers=nil, opts={}) super do |connection, url| params[:body] = __convert_to_json(body) if body params[:headers] = headers if headers diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/sniffer.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/sniffer.rb index 2d150004fa..5cdd623b62 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/sniffer.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/sniffer.rb @@ -28,7 +28,8 @@ def initialize(transport) # def hosts Timeout::timeout(timeout, SnifferTimeoutError) do - nodes = transport.perform_request('GET', '_nodes/http').body + nodes = transport.perform_request('GET', '_nodes/http', {}, nil, nil, + reload_on_failure: false).body hosts = nodes['nodes'].map do |id,info| if info[PROTOCOL] diff --git a/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb b/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb index 7cdd9209fe..84c92cbdfc 100644 --- a/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb +++ b/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb @@ -73,4 +73,184 @@ it_behaves_like 'a redacted string' end end + + context 'when reload_on_failure is true and and hosts are unreachable' do + + let(:client) do + Elasticsearch::Transport::Client.new(arguments) + end + + let(:arguments) do + { + hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + reload_on_failure: true, + sniffer_timeout: 5 + } + end + + it 'raises an exception' do + expect { + client.info + }.to raise_exception(Faraday::ConnectionFailed) + end + end + + context 'when the client has `retry_on_failure` set to an integer' do + + let(:client) do + Elasticsearch::Transport::Client.new(arguments) + end + + let(:arguments) do + { + hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + retry_on_failure: 2 + } + end + + context 'when `perform_request` is called without a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(3).times.and_call_original + end + + it 'uses the client `retry_on_failure` value' do + expect { + client.transport.perform_request('GET', '/info') + }.to raise_exception(Faraday::ConnectionFailed) + end + end + + context 'when `perform_request` is called with a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(6).times.and_call_original + end + + it 'uses the option `retry_on_failure` value' do + expect { + client.transport.perform_request('GET', '/info', {}, nil, nil, retry_on_failure: 5) + }.to raise_exception(Faraday::ConnectionFailed) + end + end + end + + context 'when the client has `retry_on_failure` set to true' do + + let(:client) do + Elasticsearch::Transport::Client.new(arguments) + end + + let(:arguments) do + { + hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + retry_on_failure: true + } + end + + context 'when `perform_request` is called without a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(4).times.and_call_original + end + + it 'uses the default `MAX_RETRIES` value' do + expect { + client.transport.perform_request('GET', '/info') + }.to raise_exception(Faraday::ConnectionFailed) + end + end + + context 'when `perform_request` is called with a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(6).times.and_call_original + end + + it 'uses the option `retry_on_failure` value' do + expect { + client.transport.perform_request('GET', '/info', {}, nil, nil, retry_on_failure: 5) + }.to raise_exception(Faraday::ConnectionFailed) + end + end + end + + context 'when the client has `retry_on_failure` set to false' do + + let(:client) do + Elasticsearch::Transport::Client.new(arguments) + end + + let(:arguments) do + { + hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + retry_on_failure: false + } + end + + context 'when `perform_request` is called without a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).once.and_call_original + end + + it 'does not retry' do + expect { + client.transport.perform_request('GET', '/info') + }.to raise_exception(Faraday::ConnectionFailed) + end + end + + context 'when `perform_request` is called with a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(6).times.and_call_original + end + + it 'uses the option `retry_on_failure` value' do + expect { + client.transport.perform_request('GET', '/info', {}, nil, nil, retry_on_failure: 5) + }.to raise_exception(Faraday::ConnectionFailed) + end + end + end + + context 'when the client has no `retry_on_failure` set' do + + let(:client) do + Elasticsearch::Transport::Client.new(arguments) + end + + let(:arguments) do + { + hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + } + end + + context 'when `perform_request` is called without a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(1).times.and_call_original + end + + it 'does not retry' do + expect { + client.transport.perform_request('GET', '/info') + }.to raise_exception(Faraday::ConnectionFailed) + end + end + + context 'when `perform_request` is called with a `retry_on_failure` option value' do + + before do + expect(client.transport).to receive(:get_connection).exactly(6).times.and_call_original + end + + it 'uses the option `retry_on_failure` value' do + expect { + client.transport.perform_request('GET', '/info', {}, nil, nil, retry_on_failure: 5) + }.to raise_exception(Faraday::ConnectionFailed) + end + end + end end From 7b96386d9f0a13ef03220eb632f7c49f3ecfaa2f Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 5 Aug 2019 16:26:14 +0200 Subject: [PATCH 2/2] [CLIENT] Fix minor typo --- .../spec/elasticsearch/transport/base_spec.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb b/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb index 84c92cbdfc..f14327e95d 100644 --- a/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb +++ b/elasticsearch-transport/spec/elasticsearch/transport/base_spec.rb @@ -82,7 +82,7 @@ let(:arguments) do { - hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + hosts: ['http://unavailable:9200', 'http://unavailable:9201'], reload_on_failure: true, sniffer_timeout: 5 } @@ -103,7 +103,7 @@ let(:arguments) do { - hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + hosts: ['http://unavailable:9200', 'http://unavailable:9201'], retry_on_failure: 2 } end @@ -143,7 +143,7 @@ let(:arguments) do { - hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + hosts: ['http://unavailable:9200', 'http://unavailable:9201'], retry_on_failure: true } end @@ -183,7 +183,7 @@ let(:arguments) do { - hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + hosts: ['http://unavailable:9200', 'http://unavailable:9201'], retry_on_failure: false } end @@ -223,7 +223,7 @@ let(:arguments) do { - hosts: ['http://unavabilable:9200', 'http://unavabilable:9201'], + hosts: ['http://unavailable:9200', 'http://unavailable:9201'], } end