From 2e33e94a1bd7bcf55071407332bef6c1fad68bd4 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 25 Oct 2018 17:00:10 +0200 Subject: [PATCH 1/5] [CLIENT] Use Loggable module to abstract logging --- .../lib/elasticsearch/transport.rb | 2 + .../elasticsearch/transport/transport/base.rb | 90 +++++++++---------- .../transport/transport/http/curb.rb | 4 +- .../transport/transport/http/faraday.rb | 3 +- .../transport/transport/http/manticore.rb | 3 +- .../transport/transport/loggable.rb | 26 ++++++ .../test/unit/transport_base_test.rb | 4 +- 7 files changed, 80 insertions(+), 52 deletions(-) create mode 100644 elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb diff --git a/elasticsearch-transport/lib/elasticsearch/transport.rb b/elasticsearch-transport/lib/elasticsearch/transport.rb index 012671d1b1..589345e391 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport.rb @@ -4,6 +4,7 @@ require "multi_json" require "faraday" +require "elasticsearch/transport/transport/loggable" require "elasticsearch/transport/transport/serializer/multi_json" require "elasticsearch/transport/transport/sniffer" require "elasticsearch/transport/transport/response" @@ -15,6 +16,7 @@ require "elasticsearch/transport/transport/http/faraday" require "elasticsearch/transport/client" + require "elasticsearch/transport/version" module Elasticsearch diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb index 79e4a7524f..afebda33be 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb @@ -5,6 +5,7 @@ module Transport # @abstract Module with common functionality for transport implementations. # module Base + include Loggable DEFAULT_PORT = 9200 DEFAULT_PROTOCOL = 'http' DEFAULT_RELOAD_AFTER = 10_000 # Requests @@ -165,19 +166,13 @@ def __close_connections # @api private # def __log(method, path, params, body, url, response, json, took, duration) - sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') - logger.info "#{method.to_s.upcase} #{sanitized_url} " + - "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" - logger.debug "> #{__convert_to_json(body)}" if body - logger.debug "< #{response.body}" - end - - # Log failed request - # - # @api private - # - def __log_failed(response) - logger.fatal "[#{response.status}] #{response.body}" + if logger + sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') + log_info "#{method.to_s.upcase} #{sanitized_url} " + + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" + log_debug "> #{__convert_to_json(body)}" if body + log_debug "< #{response.body}" + end end # Trace the request in the `curl` format @@ -185,15 +180,17 @@ def __log_failed(response) # @api private # def __trace(method, path, params, headers, body, url, response, json, took, duration) - trace_url = "http://localhost:9200/#{path}?pretty" + - ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) - trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' - trace_command = "curl -X #{method.to_s.upcase}" - trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? - trace_command += " '#{trace_url}'#{trace_body}\n" - tracer.info trace_command - tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" - tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" + if tracer + trace_url = "http://localhost:9200/#{path}?pretty" + + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) + trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' + trace_command = "curl -X #{method.to_s.upcase}" + trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? + trace_command += " '#{trace_url}'#{trace_body}\n" + tracer.info trace_command + tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" + tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" + end end # Raise error specific for the HTTP response status or a generic server error @@ -210,7 +207,7 @@ def __raise_transport_error(response) # @api private # def __convert_to_json(o=nil, options={}) - o = o.is_a?(String) ? o : serializer.dump(o, options) + o.is_a?(String) ? o : serializer.dump(o, options) end # Returns a full URL based on information from host @@ -246,11 +243,9 @@ def __full_url(host) # def perform_request(method, path, params={}, body=nil, headers=nil, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? - start = Time.now if logger || tracer + start = Time.now tries = 0 - params = params.clone - ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } begin @@ -263,7 +258,7 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) url = connection.full_url(path, params) - response = block.call(connection, url) + response = block.call(connection) connection.healthy! if connection.failures > 0 @@ -272,11 +267,11 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) rescue Elasticsearch::Transport::Transport::ServerError => e if @retry_on_status.include?(response.status) - logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger + log_warn("[#{e.class}] Attempt #{tries} to get response from #{url}") if tries <= max_retries retry else - logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger + log_fatal("[#{e.class}] Cannot get response from #{url} after #{tries} tries") raise e end else @@ -284,21 +279,21 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) end rescue *host_unreachable_exceptions => e - logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger + log_error("[#{e.class}] #{e.message} #{connection.host.inspect}") connection.dead! if @options[:reload_on_failure] and tries < connections.all.size - logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger + log_warn("[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})") reload_connections! and retry end if @options[:retry_on_failure] - logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger + log_warn("[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}") if tries <= max_retries retry else - logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger + log_fatal("[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries") raise e end else @@ -306,30 +301,33 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) end rescue Exception => e - logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger + log_fatal("[#{e.class}] #{e.message} (#{connection.host.inspect if connection})") raise e end #/begin - duration = Time.now-start if logger || tracer + duration = Time.now - start if response.status.to_i >= 300 - __log method, path, params, body, url, response, nil, 'N/A', duration if logger - __trace method, path, params, headers, body, url, response, nil, 'N/A', duration if tracer + __log(method, path, params, body, url, response, nil, 'N/A', duration) + __trace(method, path, params, headers, body, url, response, nil, 'N/A', duration) - # Log the failure only when `ignore` doesn't match the response status - __log_failed response if logger && !ignore.include?(response.status.to_i) - - __raise_transport_error response unless ignore.include?(response.status.to_i) + unless ignore.include?(response.status.to_i) + # Log the failure only when `ignore` doesn't match the response status + log_fatal("[#{response.status}] #{response.body}") + __raise_transport_error(response) + end end - json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ - took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer + if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ + json = serializer.load(response.body) + end - __log method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i) - __trace method, path, params, headers, body, url, response, json, took, duration if tracer + took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' + __log(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) + __trace(method, path, params, headers, body, url, response, json, took, duration) - Response.new response.status, json || response.body, response.headers + Response.new(response.status, json || response.body, response.headers) ensure @last_request_at = Time.now end diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb index ed51b3424a..282d009431 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb @@ -16,8 +16,8 @@ class Curb # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection,url| - connection.connection.url = url + super do |connection| + connection.connection.url = connection.full_url(path, params) case method when 'HEAD' diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb index bf6b44ce09..72f67b02fa 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb @@ -17,7 +17,8 @@ class Faraday # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection, url| + super do |connection| + url = connection.full_url(path, params) headers = headers || connection.connection.headers response = connection.connection.run_request \ diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb index 666aed9164..53050fa847 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb @@ -64,7 +64,8 @@ def build_client(options={}) # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection, url| + super do |connection| + url = connection.full_url(path, params) params[:body] = __convert_to_json(body) if body params[:headers] = headers if headers params = params.merge @request_options diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb new file mode 100644 index 0000000000..a47ea45ca3 --- /dev/null +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb @@ -0,0 +1,26 @@ +module Elasticsearch + + module Loggable + + + def log_debug(message) + logger.debug(message) if logger && logger.debug? + end + + def log_error(message) + logger.error(message) if logger && logger.error? + end + + def log_fatal(message) + logger.fatal(message) if logger && logger.fatal? + end + + def log_info(message) + logger.info(message) if logger && logger.info? + end + + def log_warn(message) + logger.warn(message) if logger && logger.warn? + end + end +end diff --git a/elasticsearch-transport/test/unit/transport_base_test.rb b/elasticsearch-transport/test/unit/transport_base_test.rb index 0a5f6208e8..0ce6b8dc69 100644 --- a/elasticsearch-transport/test/unit/transport_base_test.rb +++ b/elasticsearch-transport/test/unit/transport_base_test.rb @@ -239,7 +239,7 @@ def initialize(*); end @transport.expects(:get_connection).returns(stub_everything :failures => 1) # `block.expects(:call).raises(::Errno::ECONNREFUSED)` fails on Ruby 1.8 - block = lambda { |a, b| raise ::Errno::ECONNREFUSED } + block = lambda { |a| raise ::Errno::ECONNREFUSED } assert_raise ::Errno::ECONNREFUSED do @transport.perform_request 'GET', '/', &block @@ -265,7 +265,7 @@ def initialize(*); end c = stub_everything :failures => 1 @transport.expects(:get_connection).returns(c) - block = lambda { |a,b| raise ::Errno::ECONNREFUSED } + block = lambda { |a| raise ::Errno::ECONNREFUSED } c.expects(:dead!) From 92bb2c23dcd10058b93624949fda1564d815c21f Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 26 Oct 2018 16:16:22 +0200 Subject: [PATCH 2/5] [CLIENT] Rename __log method to be more precise --- .../elasticsearch/transport/transport/base.rb | 265 +++++++++--------- .../transport/transport/loggable.rb | 1 - .../test/unit/transport_base_test.rb | 6 +- 3 files changed, 136 insertions(+), 136 deletions(-) diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb index afebda33be..96ca2f7208 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb @@ -6,6 +6,7 @@ module Transport # module Base include Loggable + DEFAULT_PORT = 9200 DEFAULT_PROTOCOL = 'http' DEFAULT_RELOAD_AFTER = 10_000 # Requests @@ -82,7 +83,7 @@ def reload_connections! __rebuild_connections :hosts => hosts, :options => options self rescue SnifferTimeoutError - logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger + log_error("[SnifferTimeoutError] Timeout when reloading connections.") self end @@ -94,135 +95,6 @@ def resurrect_dead_connections! connections.dead.each { |c| c.resurrect! } end - # Rebuilds the connections collection in the transport. - # - # The methods *adds* new connections from the passed hosts to the collection, - # and *removes* all connections not contained in the passed hosts. - # - # @return [Connections::Collection] - # @api private - # - def __rebuild_connections(arguments={}) - @state_mutex.synchronize do - @hosts = arguments[:hosts] || [] - @options = arguments[:options] || {} - - __close_connections - - new_connections = __build_connections - stale_connections = @connections.all.select { |c| ! new_connections.include?(c) } - new_connections = new_connections.reject { |c| @connections.include?(c) } - - @connections.remove(stale_connections) - @connections.add(new_connections) - @connections - end - end - - # Builds and returns a collection of connections - # - # The adapters have to implement the {Base#__build_connection} method. - # - # @return [Connections::Collection] - # @api private - # - def __build_connections - Connections::Collection.new \ - :connections => hosts.map { |host| - host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL - host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT - if (options[:user] || options[:http][:user]) && !host[:user] - host[:user] ||= options[:user] || options[:http][:user] - host[:password] ||= options[:password] || options[:http][:password] - end - - __build_connection(host, (options[:transport_options] || {}), @block) - }, - :selector_class => options[:selector_class], - :selector => options[:selector] - end - - # @abstract Build and return a connection. - # A transport implementation *must* implement this method. - # See {HTTP::Faraday#__build_connection} for an example. - # - # @return [Connections::Connection] - # @api private - # - def __build_connection(host, options={}, block=nil) - raise NoMethodError, "Implement this method in your class" - end - - # Closes the connections collection - # - # @api private - # - def __close_connections - # A hook point for specific adapters when they need to close connections - end - - # Log request and response information - # - # @api private - # - def __log(method, path, params, body, url, response, json, took, duration) - if logger - sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') - log_info "#{method.to_s.upcase} #{sanitized_url} " + - "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" - log_debug "> #{__convert_to_json(body)}" if body - log_debug "< #{response.body}" - end - end - - # Trace the request in the `curl` format - # - # @api private - # - def __trace(method, path, params, headers, body, url, response, json, took, duration) - if tracer - trace_url = "http://localhost:9200/#{path}?pretty" + - ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) - trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' - trace_command = "curl -X #{method.to_s.upcase}" - trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? - trace_command += " '#{trace_url}'#{trace_body}\n" - tracer.info trace_command - tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" - tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" - end - end - - # Raise error specific for the HTTP response status or a generic server error - # - # @api private - # - def __raise_transport_error(response) - error = ERRORS[response.status] || ServerError - raise error.new "[#{response.status}] #{response.body}" - end - - # Converts any non-String object to JSON - # - # @api private - # - def __convert_to_json(o=nil, options={}) - o.is_a?(String) ? o : serializer.dump(o, options) - end - - # Returns a full URL based on information from host - # - # @param host [Hash] Host configuration passed in from {Client} - # - # @api private - def __full_url(host) - url = "#{host[:protocol]}://" - url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user] - url += "#{host[:host]}:#{host[:port]}" - url += "#{host[:path]}" if host[:path] - url - end - # Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, # retrying the request and reloading the connections. # @@ -309,7 +181,7 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) duration = Time.now - start if response.status.to_i >= 300 - __log(method, path, params, body, url, response, nil, 'N/A', duration) + __log_response(method, path, params, body, url, response, nil, 'N/A', duration) __trace(method, path, params, headers, body, url, response, nil, 'N/A', duration) unless ignore.include?(response.status.to_i) @@ -324,7 +196,7 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) end took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' - __log(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) + __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) __trace(method, path, params, headers, body, url, response, json, took, duration) Response.new(response.status, json || response.body, response.headers) @@ -340,6 +212,135 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) def host_unreachable_exceptions [Errno::ECONNREFUSED] end + + # Rebuilds the connections collection in the transport. + # + # The methods *adds* new connections from the passed hosts to the collection, + # and *removes* all connections not contained in the passed hosts. + # + # @return [Connections::Collection] + # @api private + # + def __rebuild_connections(arguments={}) + @state_mutex.synchronize do + @hosts = arguments[:hosts] || [] + @options = arguments[:options] || {} + + __close_connections + + new_connections = __build_connections + stale_connections = @connections.all.select { |c| ! new_connections.include?(c) } + new_connections = new_connections.reject { |c| @connections.include?(c) } + + @connections.remove(stale_connections) + @connections.add(new_connections) + @connections + end + end + + # Builds and returns a collection of connections + # + # The adapters have to implement the {Base#__build_connection} method. + # + # @return [Connections::Collection] + # @api private + # + def __build_connections + Connections::Collection.new \ + :connections => hosts.map { |host| + host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL + host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT + if (options[:user] || options[:http][:user]) && !host[:user] + host[:user] ||= options[:user] || options[:http][:user] + host[:password] ||= options[:password] || options[:http][:password] + end + + __build_connection(host, (options[:transport_options] || {}), @block) + }, + :selector_class => options[:selector_class], + :selector => options[:selector] + end + + # @abstract Build and return a connection. + # A transport implementation *must* implement this method. + # See {HTTP::Faraday#__build_connection} for an example. + # + # @return [Connections::Connection] + # @api private + # + def __build_connection(host, options={}, block=nil) + raise NoMethodError, "Implement this method in your class" + end + + # Closes the connections collection + # + # @api private + # + def __close_connections + # A hook point for specific adapters when they need to close connections + end + + # Trace the request in the `curl` format + # + # @api private + # + def __trace(method, path, params, headers, body, url, response, json, took, duration) + if tracer + trace_url = "http://localhost:9200/#{path}?pretty" + + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) + trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' + trace_command = "curl -X #{method.to_s.upcase}" + trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? + trace_command += " '#{trace_url}'#{trace_body}\n" + tracer.info trace_command + tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" + tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" + end + end + + # Raise error specific for the HTTP response status or a generic server error + # + # @api private + # + def __raise_transport_error(response) + error = ERRORS[response.status] || ServerError + raise error.new "[#{response.status}] #{response.body}" + end + + # Converts any non-String object to JSON + # + # @api private + # + def __convert_to_json(o=nil, options={}) + o.is_a?(String) ? o : serializer.dump(o, options) + end + + # Returns a full URL based on information from host + # + # @param host [Hash] Host configuration passed in from {Client} + # + # @api private + def __full_url(host) + url = "#{host[:protocol]}://" + url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user] + url += "#{host[:host]}:#{host[:port]}" + url += "#{host[:path]}" if host[:path] + url + end + + private + + # Log request and response information + # + # @api private + # + def __log_response(method, path, params, body, url, response, json, took, duration) + sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') + log_info "#{method.to_s.upcase} #{sanitized_url} " + + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" + log_debug "> #{__convert_to_json(body)}" if body + log_debug "< #{response.body}" + end end end end diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb index a47ea45ca3..8de3985540 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb @@ -2,7 +2,6 @@ module Elasticsearch module Loggable - def log_debug(message) logger.debug(message) if logger && logger.debug? end diff --git a/elasticsearch-transport/test/unit/transport_base_test.rb b/elasticsearch-transport/test/unit/transport_base_test.rb index 0ce6b8dc69..0e48562982 100644 --- a/elasticsearch-transport/test/unit/transport_base_test.rb +++ b/elasticsearch-transport/test/unit/transport_base_test.rb @@ -424,7 +424,7 @@ def initialize(*); end @block = Proc.new { |c, u| puts "ERROR" } @block.expects(:call).returns(Elasticsearch::Transport::Transport::Response.new 500, 'ERROR') - @transport.expects(:__log) + @transport.expects(:__log_response) @transport.logger.expects(:fatal) assert_raise Elasticsearch::Transport::Transport::Errors::InternalServerError do @@ -436,7 +436,7 @@ def initialize(*); end @block = Proc.new { |c, u| puts "ERROR" } @block.expects(:call).returns(Elasticsearch::Transport::Transport::Response.new 500, 'ERROR') - @transport.expects(:__log).once + @transport.expects(:__log_response).once @transport.logger.expects(:fatal).never # No `BadRequest` error @@ -447,7 +447,7 @@ def initialize(*); end @block = Proc.new { |c, u| puts "ERROR" } @block.expects(:call).raises(Exception) - @transport.expects(:__log).never + @transport.expects(:__log_response).never @transport.logger.expects(:fatal) assert_raise(Exception) { @transport.perform_request('POST', '_search', &@block) } From 3082b0366c13da0427c53c5908f05a1d7424c151 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 26 Oct 2018 16:32:56 +0200 Subject: [PATCH 3/5] [CLIENT] Add documentation for Loggable Module --- .../transport/transport/loggable.rb | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb index 8de3985540..58f3adc9c2 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb @@ -1,23 +1,66 @@ module Elasticsearch + # Module to encapsulate all logging functionality. + # + # @since 7.0.0 module Loggable + # Log a debug message. + # + # @example Log a debug message. + # log_debug('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 def log_debug(message) logger.debug(message) if logger && logger.debug? end + # Log an error message. + # + # @example Log an error message. + # log_error('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 def log_error(message) logger.error(message) if logger && logger.error? end + # Log a fatal message. + # + # @example Log a fatal message. + # log_fatal('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 def log_fatal(message) logger.fatal(message) if logger && logger.fatal? end + # Log an info message. + # + # @example Log an info message. + # log_info('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 def log_info(message) logger.info(message) if logger && logger.info? end + # Log a warn message. + # + # @example Log a warn message. + # log_warn('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 def log_warn(message) logger.warn(message) if logger && logger.warn? end From 4faf1e6788d54b3dc7b6689e63abb6115c1e2f98 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 26 Oct 2018 16:38:07 +0200 Subject: [PATCH 4/5] [CLIENT] Minor spacing fix --- elasticsearch-transport/lib/elasticsearch/transport.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/elasticsearch-transport/lib/elasticsearch/transport.rb b/elasticsearch-transport/lib/elasticsearch/transport.rb index 589345e391..eead9eeb01 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport.rb @@ -16,7 +16,6 @@ require "elasticsearch/transport/transport/http/faraday" require "elasticsearch/transport/client" - require "elasticsearch/transport/version" module Elasticsearch From 980f6eb77fe51fce92f3e98b6f7621b7b4681e22 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Fri, 26 Oct 2018 17:20:33 +0200 Subject: [PATCH 5/5] [CLIENT] Rollback some changes beyond the scope of this pull request --- .../elasticsearch/transport/transport/base.rb | 284 +++++++++--------- .../transport/transport/http/curb.rb | 2 +- .../transport/transport/http/faraday.rb | 3 +- .../transport/transport/http/manticore.rb | 3 +- .../test/unit/transport_base_test.rb | 4 +- 5 files changed, 148 insertions(+), 148 deletions(-) diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb index 96ca2f7208..035bdd7101 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb @@ -83,7 +83,7 @@ def reload_connections! __rebuild_connections :hosts => hosts, :options => options self rescue SnifferTimeoutError - log_error("[SnifferTimeoutError] Timeout when reloading connections.") + log_error "[SnifferTimeoutError] Timeout when reloading connections." self end @@ -95,124 +95,6 @@ def resurrect_dead_connections! connections.dead.each { |c| c.resurrect! } end - # Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, - # retrying the request and reloading the connections. - # - # @abstract The transport implementation has to implement this method either in full, - # or by invoking this method with a block. See {HTTP::Faraday#perform_request} for an example. - # - # @param method [String] Request method - # @param path [String] The API endpoint - # @param params [Hash] Request parameters (will be serialized by {Connections::Connection#full_url}) - # @param body [Hash] Request body (will be serialized by the {#serializer}) - # @param headers [Hash] Request headers (will be serialized by the {#serializer}) - # @param block [Proc] Code block to evaluate, passed from the implementation - # - # @return [Response] - # @raise [NoMethodError] If no block is passed - # @raise [ServerError] If request failed on server - # @raise [Error] If no connection is available - # - def perform_request(method, path, params={}, body=nil, headers=nil, &block) - raise NoMethodError, "Implement this method in your transport class" unless block_given? - start = Time.now - tries = 0 - params = params.clone - ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } - - begin - tries += 1 - connection = get_connection or raise Error.new("Cannot get new connection from pool.") - - if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) - params = connection.connection.params.merge(params.to_hash) - end - - url = connection.full_url(path, params) - - response = block.call(connection) - - connection.healthy! if connection.failures > 0 - - # Raise an exception so we can catch it for `retry_on_status` - __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) - - rescue Elasticsearch::Transport::Transport::ServerError => e - if @retry_on_status.include?(response.status) - log_warn("[#{e.class}] Attempt #{tries} to get response from #{url}") - if tries <= max_retries - retry - else - log_fatal("[#{e.class}] Cannot get response from #{url} after #{tries} tries") - raise e - end - else - raise e - end - - rescue *host_unreachable_exceptions => e - log_error("[#{e.class}] #{e.message} #{connection.host.inspect}") - - connection.dead! - - if @options[:reload_on_failure] and tries < connections.all.size - log_warn("[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})") - reload_connections! and retry - end - - if @options[:retry_on_failure] - log_warn("[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}") - if tries <= max_retries - retry - else - log_fatal("[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries") - raise e - end - else - raise e - end - - rescue Exception => e - log_fatal("[#{e.class}] #{e.message} (#{connection.host.inspect if connection})") - raise e - - end #/begin - - duration = Time.now - start - - if response.status.to_i >= 300 - __log_response(method, path, params, body, url, response, nil, 'N/A', duration) - __trace(method, path, params, headers, body, url, response, nil, 'N/A', duration) - - unless ignore.include?(response.status.to_i) - # Log the failure only when `ignore` doesn't match the response status - log_fatal("[#{response.status}] #{response.body}") - __raise_transport_error(response) - end - end - - if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ - json = serializer.load(response.body) - end - - took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' - __log_response(method, path, params, body, url, response, json, took, duration) unless ignore.include?(response.status.to_i) - __trace(method, path, params, headers, body, url, response, json, took, duration) - - Response.new(response.status, json || response.body, response.headers) - ensure - @last_request_at = Time.now - end - - # @abstract Returns an Array of connection errors specific to the transport implementation. - # See {HTTP::Faraday#host_unreachable_exceptions} for an example. - # - # @return [Array] - # - def host_unreachable_exceptions - [Errno::ECONNREFUSED] - end - # Rebuilds the connections collection in the transport. # # The methods *adds* new connections from the passed hosts to the collection, @@ -280,22 +162,34 @@ def __close_connections # A hook point for specific adapters when they need to close connections end + # Log request and response information + # + # @api private + # + def __log_response(method, path, params, body, url, response, json, took, duration) + if logger + sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') + log_info "#{method.to_s.upcase} #{sanitized_url} " + + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" + log_debug "> #{__convert_to_json(body)}" if body + log_debug "< #{response.body}" + end + end + # Trace the request in the `curl` format # # @api private # def __trace(method, path, params, headers, body, url, response, json, took, duration) - if tracer - trace_url = "http://localhost:9200/#{path}?pretty" + - ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) - trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' - trace_command = "curl -X #{method.to_s.upcase}" - trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? - trace_command += " '#{trace_url}'#{trace_body}\n" - tracer.info trace_command - tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" - tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" - end + trace_url = "http://localhost:9200/#{path}?pretty" + + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) + trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' + trace_command = "curl -X #{method.to_s.upcase}" + trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? + trace_command += " '#{trace_url}'#{trace_body}\n" + tracer.info trace_command + tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#" + tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n" end # Raise error specific for the HTTP response status or a generic server error @@ -312,7 +206,7 @@ def __raise_transport_error(response) # @api private # def __convert_to_json(o=nil, options={}) - o.is_a?(String) ? o : serializer.dump(o, options) + o = o.is_a?(String) ? o : serializer.dump(o, options) end # Returns a full URL based on information from host @@ -328,20 +222,128 @@ def __full_url(host) url end - private + # Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, + # retrying the request and reloading the connections. + # + # @abstract The transport implementation has to implement this method either in full, + # or by invoking this method with a block. See {HTTP::Faraday#perform_request} for an example. + # + # @param method [String] Request method + # @param path [String] The API endpoint + # @param params [Hash] Request parameters (will be serialized by {Connections::Connection#full_url}) + # @param body [Hash] Request body (will be serialized by the {#serializer}) + # @param headers [Hash] Request headers (will be serialized by the {#serializer}) + # @param block [Proc] Code block to evaluate, passed from the implementation + # + # @return [Response] + # @raise [NoMethodError] If no block is passed + # @raise [ServerError] If request failed on server + # @raise [Error] If no connection is available + # + def perform_request(method, path, params={}, body=nil, headers=nil, &block) + raise NoMethodError, "Implement this method in your transport class" unless block_given? + start = Time.now + tries = 0 - # Log request and response information + params = params.clone + + ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i } + + begin + tries += 1 + connection = get_connection or raise Error.new("Cannot get new connection from pool.") + + if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash) + params = connection.connection.params.merge(params.to_hash) + end + + url = connection.full_url(path, params) + + response = block.call(connection, url) + + connection.healthy! if connection.failures > 0 + + # Raise an exception so we can catch it for `retry_on_status` + __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i) + + rescue Elasticsearch::Transport::Transport::ServerError => e + if @retry_on_status.include?(response.status) + log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" + if tries <= max_retries + retry + else + log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" + raise e + end + else + raise e + end + + rescue *host_unreachable_exceptions => e + log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" + + connection.dead! + + if @options[:reload_on_failure] and tries < connections.all.size + log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" + reload_connections! and retry + end + + if @options[:retry_on_failure] + log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" + if tries <= max_retries + retry + else + log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" + raise e + end + else + raise e + end + + rescue Exception => e + log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" + raise e + + end #/begin + + duration = Time.now - start + + if response.status.to_i >= 300 + __log_response method, path, params, body, url, response, nil, 'N/A', duration + __trace method, path, params, headers, body, url, response, nil, 'N/A', duration if tracer + + # Log the failure only when `ignore` doesn't match the response status + unless ignore.include?(response.status.to_i) + log_fatal "[#{response.status}] #{response.body}" + end + + __raise_transport_error response unless ignore.include?(response.status.to_i) + end + + json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ + took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' + + unless ignore.include?(response.status.to_i) + __log_response method, path, params, body, url, response, json, took, duration + end + + __trace method, path, params, headers, body, url, response, json, took, duration if tracer + + Response.new response.status, json || response.body, response.headers + ensure + @last_request_at = Time.now + end + + # @abstract Returns an Array of connection errors specific to the transport implementation. + # See {HTTP::Faraday#host_unreachable_exceptions} for an example. # - # @api private + # @return [Array] # - def __log_response(method, path, params, body, url, response, json, took, duration) - sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') - log_info "#{method.to_s.upcase} #{sanitized_url} " + - "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" - log_debug "> #{__convert_to_json(body)}" if body - log_debug "< #{response.body}" + def host_unreachable_exceptions + [Errno::ECONNREFUSED] end end end end -end +end \ No newline at end of file diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb index 282d009431..6cecbe852e 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb @@ -16,7 +16,7 @@ class Curb # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection| + super do |connection, url| connection.connection.url = connection.full_url(path, params) case method diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb index 72f67b02fa..bf6b44ce09 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/faraday.rb @@ -17,8 +17,7 @@ class Faraday # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection| - url = connection.full_url(path, params) + super do |connection, url| headers = headers || connection.connection.headers response = connection.connection.run_request \ diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb index 53050fa847..666aed9164 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/manticore.rb @@ -64,8 +64,7 @@ def build_client(options={}) # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection| - url = connection.full_url(path, params) + super do |connection, url| params[:body] = __convert_to_json(body) if body params[:headers] = headers if headers params = params.merge @request_options diff --git a/elasticsearch-transport/test/unit/transport_base_test.rb b/elasticsearch-transport/test/unit/transport_base_test.rb index 0e48562982..580d144e76 100644 --- a/elasticsearch-transport/test/unit/transport_base_test.rb +++ b/elasticsearch-transport/test/unit/transport_base_test.rb @@ -239,7 +239,7 @@ def initialize(*); end @transport.expects(:get_connection).returns(stub_everything :failures => 1) # `block.expects(:call).raises(::Errno::ECONNREFUSED)` fails on Ruby 1.8 - block = lambda { |a| raise ::Errno::ECONNREFUSED } + block = lambda { |a,b| raise ::Errno::ECONNREFUSED } assert_raise ::Errno::ECONNREFUSED do @transport.perform_request 'GET', '/', &block @@ -265,7 +265,7 @@ def initialize(*); end c = stub_everything :failures => 1 @transport.expects(:get_connection).returns(c) - block = lambda { |a| raise ::Errno::ECONNREFUSED } + block = lambda { |a,b| raise ::Errno::ECONNREFUSED } c.expects(:dead!)