Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elasticsearch-transport/lib/elasticsearch/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "multi_json"
require "faraday"

require "elasticsearch/transport/transport/loggable"
require "elasticsearch/transport/transport/serializer/multi_json"
require "elasticsearch/transport/transport/sniffer"
require "elasticsearch/transport/transport/response"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module Transport
# @abstract Module with common functionality for transport implementations.
#
module Base
include Loggable

DEFAULT_PORT = 9200
DEFAULT_PROTOCOL = 'http'
DEFAULT_RELOAD_AFTER = 10_000 # Requests
Expand Down Expand Up @@ -81,7 +83,7 @@ def reload_connections!
__rebuild_connections :hosts => hosts, :options => options
self
rescue SnifferTimeoutError
logger.error "[SnifferTimeoutError] Timeout when reloading connections." if logger
log_error "[SnifferTimeoutError] Timeout when reloading connections."
self
end

Expand Down Expand Up @@ -128,15 +130,15 @@ def __rebuild_connections(arguments={})
def __build_connections
Connections::Collection.new \
:connections => hosts.map { |host|
host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
if (options[:user] || options[:http][:user]) && !host[:user]
host[:user] ||= options[:user] || options[:http][:user]
host[:password] ||= options[:password] || options[:http][:password]
end
host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
if (options[:user] || options[:http][:user]) && !host[:user]
host[:user] ||= options[:user] || options[:http][:user]
host[:password] ||= options[:password] || options[:http][:password]
end

__build_connection(host, (options[:transport_options] || {}), @block)
},
__build_connection(host, (options[:transport_options] || {}), @block)
},
:selector_class => options[:selector_class],
:selector => options[:selector]
end
Expand Down Expand Up @@ -164,20 +166,14 @@ def __close_connections
#
# @api private
#
def __log(method, path, params, body, url, response, json, took, duration)
sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
logger.info "#{method.to_s.upcase} #{sanitized_url} " +
"[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
logger.debug "> #{__convert_to_json(body)}" if body
logger.debug "< #{response.body}"
end

# Log failed request
#
# @api private
#
def __log_failed(response)
logger.fatal "[#{response.status}] #{response.body}"
def __log_response(method, path, params, body, url, response, json, took, duration)
if logger
sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
log_info "#{method.to_s.upcase} #{sanitized_url} " +
"[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
log_debug "> #{__convert_to_json(body)}" if body
log_debug "< #{response.body}"
end
end

# Trace the request in the `curl` format
Expand All @@ -186,7 +182,7 @@ def __log_failed(response)
#
def __trace(method, path, params, headers, body, url, response, json, took, duration)
trace_url = "http://localhost:9200/#{path}?pretty" +
( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
trace_command = "curl -X #{method.to_s.upcase}"
trace_command += " -H '#{headers.inject('') { |memo,item| memo << item[0] + ': ' + item[1] }}'" if headers && !headers.empty?
Expand Down Expand Up @@ -246,7 +242,7 @@ def __full_url(host)
#
def perform_request(method, path, params={}, body=nil, headers=nil, &block)
raise NoMethodError, "Implement this method in your transport class" unless block_given?
start = Time.now if logger || tracer
start = Time.now
tries = 0

params = params.clone
Expand All @@ -272,61 +268,66 @@ def perform_request(method, path, params={}, body=nil, headers=nil, &block)

rescue Elasticsearch::Transport::Transport::ServerError => e
if @retry_on_status.include?(response.status)
logger.warn "[#{e.class}] Attempt #{tries} to get response from #{url}" if logger
log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
if tries <= max_retries
retry
else
logger.fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries" if logger
log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
raise e
end
else
raise e
end

rescue *host_unreachable_exceptions => e
logger.error "[#{e.class}] #{e.message} #{connection.host.inspect}" if logger
log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

connection.dead!

if @options[:reload_on_failure] and tries < connections.all.size
logger.warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})" if logger
log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
reload_connections! and retry
end

if @options[:retry_on_failure]
logger.warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}" if logger
log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
if tries <= max_retries
retry
else
logger.fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries" if logger
log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
raise e
end
else
raise e
end

rescue Exception => e
logger.fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})" if logger
log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
raise e

end #/begin

duration = Time.now-start if logger || tracer
duration = Time.now - start

if response.status.to_i >= 300
__log method, path, params, body, url, response, nil, 'N/A', duration if logger
__log_response method, path, params, body, url, response, nil, 'N/A', duration
__trace method, path, params, headers, body, url, response, nil, 'N/A', duration if tracer

# Log the failure only when `ignore` doesn't match the response status
__log_failed response if logger && !ignore.include?(response.status.to_i)
unless ignore.include?(response.status.to_i)
log_fatal "[#{response.status}] #{response.body}"
end

__raise_transport_error response unless ignore.include?(response.status.to_i)
end

json = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a' if logger || tracer
took = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'

unless ignore.include?(response.status.to_i)
__log_response method, path, params, body, url, response, json, took, duration
end

__log method, path, params, body, url, response, json, took, duration if logger && !ignore.include?(response.status.to_i)
__trace method, path, params, headers, body, url, response, json, took, duration if tracer

Response.new response.status, json || response.body, response.headers
Expand All @@ -345,4 +346,4 @@ def host_unreachable_exceptions
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class Curb
# @see Transport::Base#perform_request
#
def perform_request(method, path, params={}, body=nil, headers=nil)
super do |connection,url|
connection.connection.url = url
super do |connection, url|
connection.connection.url = connection.full_url(path, params)

case method
when 'HEAD'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module Elasticsearch

# Module to encapsulate all logging functionality.
#
# @since 7.0.0
module Loggable

# Log a debug message.
#
# @example Log a debug message.
# log_debug('Message')
#
# @param [ String ] message The message to log.
#
# @since 7.0.0
def log_debug(message)
logger.debug(message) if logger && logger.debug?
end

# Log an error message.
#
# @example Log an error message.
# log_error('Message')
#
# @param [ String ] message The message to log.
#
# @since 7.0.0
def log_error(message)
logger.error(message) if logger && logger.error?
end

# Log a fatal message.
#
# @example Log a fatal message.
# log_fatal('Message')
#
# @param [ String ] message The message to log.
#
# @since 7.0.0
def log_fatal(message)
logger.fatal(message) if logger && logger.fatal?
end

# Log an info message.
#
# @example Log an info message.
# log_info('Message')
#
# @param [ String ] message The message to log.
#
# @since 7.0.0
def log_info(message)
logger.info(message) if logger && logger.info?
end

# Log a warn message.
#
# @example Log a warn message.
# log_warn('Message')
#
# @param [ String ] message The message to log.
#
# @since 7.0.0
def log_warn(message)
logger.warn(message) if logger && logger.warn?
end
end
end
8 changes: 4 additions & 4 deletions elasticsearch-transport/test/unit/transport_base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def initialize(*); end
@transport.expects(:get_connection).returns(stub_everything :failures => 1)

# `block.expects(:call).raises(::Errno::ECONNREFUSED)` fails on Ruby 1.8
block = lambda { |a, b| raise ::Errno::ECONNREFUSED }
block = lambda { |a,b| raise ::Errno::ECONNREFUSED }

assert_raise ::Errno::ECONNREFUSED do
@transport.perform_request 'GET', '/', &block
Expand Down Expand Up @@ -424,7 +424,7 @@ def initialize(*); end
@block = Proc.new { |c, u| puts "ERROR" }
@block.expects(:call).returns(Elasticsearch::Transport::Transport::Response.new 500, 'ERROR')

@transport.expects(:__log)
@transport.expects(:__log_response)
@transport.logger.expects(:fatal)

assert_raise Elasticsearch::Transport::Transport::Errors::InternalServerError do
Expand All @@ -436,7 +436,7 @@ def initialize(*); end
@block = Proc.new { |c, u| puts "ERROR" }
@block.expects(:call).returns(Elasticsearch::Transport::Transport::Response.new 500, 'ERROR')

@transport.expects(:__log).once
@transport.expects(:__log_response).once
@transport.logger.expects(:fatal).never

# No `BadRequest` error
Expand All @@ -447,7 +447,7 @@ def initialize(*); end
@block = Proc.new { |c, u| puts "ERROR" }
@block.expects(:call).raises(Exception)

@transport.expects(:__log).never
@transport.expects(:__log_response).never
@transport.logger.expects(:fatal)

assert_raise(Exception) { @transport.perform_request('POST', '_search', &@block) }
Expand Down