diff --git a/elasticsearch-transport/lib/elasticsearch/transport.rb b/elasticsearch-transport/lib/elasticsearch/transport.rb index 012671d1b1..eead9eeb01 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport.rb @@ -4,6 +4,7 @@ require "multi_json" require "faraday" +require "elasticsearch/transport/transport/loggable" require "elasticsearch/transport/transport/serializer/multi_json" require "elasticsearch/transport/transport/sniffer" require "elasticsearch/transport/transport/response" diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb index 79e4a7524f..035bdd7101 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/base.rb @@ -5,6 +5,8 @@ module Transport # @abstract Module with common functionality for transport implementations. # module Base + include Loggable + DEFAULT_PORT = 9200 DEFAULT_PROTOCOL = 'http' DEFAULT_RELOAD_AFTER = 10_000 # Requests @@ -81,7 +83,7 @@ def reload_connections! __rebuild_connections :hosts => hosts, :options => options self rescue SnifferTimeoutError - logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger + log_error "[SnifferTimeoutError] Timeout when reloading connections." self end @@ -128,15 +130,15 @@ def __rebuild_connections(arguments={}) def __build_connections Connections::Collection.new \ :connections => hosts.map { |host| - host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL - host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT - if (options[:user] || options[:http][:user]) && !host[:user] - host[:user] ||= options[:user] || options[:http][:user] - host[:password] ||= options[:password] || options[:http][:password] - end + host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL + host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT + if (options[:user] || options[:http][:user]) && !host[:user] + host[:user] ||= options[:user] || options[:http][:user] + host[:password] ||= options[:password] || options[:http][:password] + end - __build_connection(host, (options[:transport_options] || {}), @block) - }, + __build_connection(host, (options[:transport_options] || {}), @block) + }, :selector_class => options[:selector_class], :selector => options[:selector] end @@ -164,20 +166,14 @@ def __close_connections # # @api private # - def __log(method, path, params, body, url, response, json, took, duration) - sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') - logger.info "#{method.to_s.upcase} #{sanitized_url} " + - "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" - logger.debug "> #{__convert_to_json(body)}" if body - logger.debug "< #{response.body}" - end - - # Log failed request - # - # @api private - # - def __log_failed(response) - logger.fatal "[#{response.status}] #{response.body}" + def __log_response(method, path, params, body, url, response, json, took, duration) + if logger + sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@') + log_info "#{method.to_s.upcase} #{sanitized_url} " + + "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]" + log_debug "> #{__convert_to_json(body)}" if body + log_debug "< #{response.body}" + end end # Trace the request in the `curl` format @@ -186,7 +182,7 @@ def __log_failed(response) # def __trace(method, path, params, headers, body, url, response, json, took, duration) trace_url = "http://localhost:9200/#{path}?pretty" + - ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) + ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" ) trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : '' trace_command = "curl -X #{method.to_s.upcase}" trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty? @@ -246,7 +242,7 @@ def __full_url(host) # def perform_request(method, path, params={}, body=nil, headers=nil, &block) raise NoMethodError, "Implement this method in your transport class" unless block_given? - start = Time.now if logger || tracer + start = Time.now tries = 0 params = params.clone @@ -272,11 +268,11 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) rescue Elasticsearch::Transport::Transport::ServerError => e if @retry_on_status.include?(response.status) - logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger + log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if tries <= max_retries retry else - logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger + log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" raise e end else @@ -284,21 +280,21 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) end rescue *host_unreachable_exceptions => e - logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger + log_error "[#{e.class}] #{e.message} #{connection.host.inspect}" connection.dead! if @options[:reload_on_failure] and tries < connections.all.size - logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger + log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" reload_connections! and retry end if @options[:retry_on_failure] - logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger + log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if tries <= max_retries retry else - logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger + log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" raise e end else @@ -306,27 +302,32 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block) end rescue Exception => e - logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger + log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" raise e end #/begin - duration = Time.now-start if logger || tracer + duration = Time.now - start if response.status.to_i >= 300 - __log method, path, params, body, url, response, nil, 'N/A', duration if logger + __log_response method, path, params, body, url, response, nil, 'N/A', duration __trace method, path, params, headers, body, url, response, nil, 'N/A', duration if tracer # Log the failure only when `ignore` doesn't match the response status - __log_failed response if logger && !ignore.include?(response.status.to_i) + unless ignore.include?(response.status.to_i) + log_fatal "[#{response.status}] #{response.body}" + end __raise_transport_error response unless ignore.include?(response.status.to_i) end json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/ - took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer + took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' + + unless ignore.include?(response.status.to_i) + __log_response method, path, params, body, url, response, json, took, duration + end - __log method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i) __trace method, path, params, headers, body, url, response, json, took, duration if tracer Response.new response.status, json || response.body, response.headers @@ -345,4 +346,4 @@ def host_unreachable_exceptions end end end -end +end \ No newline at end of file diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb index ed51b3424a..6cecbe852e 100644 --- a/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/http/curb.rb @@ -16,8 +16,8 @@ class Curb # @see Transport::Base#perform_request # def perform_request(method, path, params={}, body=nil, headers=nil) - super do |connection,url| - connection.connection.url = url + super do |connection, url| + connection.connection.url = connection.full_url(path, params) case method when 'HEAD' diff --git a/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb new file mode 100644 index 0000000000..58f3adc9c2 --- /dev/null +++ b/elasticsearch-transport/lib/elasticsearch/transport/transport/loggable.rb @@ -0,0 +1,68 @@ +module Elasticsearch + + # Module to encapsulate all logging functionality. + # + # @since 7.0.0 + module Loggable + + # Log a debug message. + # + # @example Log a debug message. + # log_debug('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 + def log_debug(message) + logger.debug(message) if logger && logger.debug? + end + + # Log an error message. + # + # @example Log an error message. + # log_error('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 + def log_error(message) + logger.error(message) if logger && logger.error? + end + + # Log a fatal message. + # + # @example Log a fatal message. + # log_fatal('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 + def log_fatal(message) + logger.fatal(message) if logger && logger.fatal? + end + + # Log an info message. + # + # @example Log an info message. + # log_info('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 + def log_info(message) + logger.info(message) if logger && logger.info? + end + + # Log a warn message. + # + # @example Log a warn message. + # log_warn('Message') + # + # @param [ String ] message The message to log. + # + # @since 7.0.0 + def log_warn(message) + logger.warn(message) if logger && logger.warn? + end + end +end diff --git a/elasticsearch-transport/test/unit/transport_base_test.rb b/elasticsearch-transport/test/unit/transport_base_test.rb index 0a5f6208e8..580d144e76 100644 --- a/elasticsearch-transport/test/unit/transport_base_test.rb +++ b/elasticsearch-transport/test/unit/transport_base_test.rb @@ -239,7 +239,7 @@ def initialize(*); end @transport.expects(:get_connection).returns(stub_everything :failures => 1) # `block.expects(:call).raises(::Errno::ECONNREFUSED)` fails on Ruby 1.8 - block = lambda { |a, b| raise ::Errno::ECONNREFUSED } + block = lambda { |a,b| raise ::Errno::ECONNREFUSED } assert_raise ::Errno::ECONNREFUSED do @transport.perform_request 'GET', '/', &block @@ -424,7 +424,7 @@ def initialize(*); end @block = Proc.new { |c, u| puts "ERROR" } @block.expects(:call).returns(Elasticsearch::Transport::Transport::Response.new 500, 'ERROR') - @transport.expects(:__log) + @transport.expects(:__log_response) @transport.logger.expects(:fatal) assert_raise Elasticsearch::Transport::Transport::Errors::InternalServerError do @@ -436,7 +436,7 @@ def initialize(*); end @block = Proc.new { |c, u| puts "ERROR" } @block.expects(:call).returns(Elasticsearch::Transport::Transport::Response.new 500, 'ERROR') - @transport.expects(:__log).once + @transport.expects(:__log_response).once @transport.logger.expects(:fatal).never # No `BadRequest` error @@ -447,7 +447,7 @@ def initialize(*); end @block = Proc.new { |c, u| puts "ERROR" } @block.expects(:call).raises(Exception) - @transport.expects(:__log).never + @transport.expects(:__log_response).never @transport.logger.expects(:fatal) assert_raise(Exception) { @transport.perform_request('POST', '_search', &@block) }