Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
module Elasticsearch
module Transport
module Transport

# @abstract Module with common functionality for transport implementations.
#
module Base
Expand Down Expand Up @@ -431,7 +430,7 @@ def apply_headers(client, options)
end

def find_value(hash, regex)
key_value = hash.find { |k,v| k.to_s.downcase =~ regex }
key_value = hash.find { |k, _| k.to_s.downcase =~ regex }
if key_value
hash.delete(key_value[0])
key_value[1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,20 @@ module HTTP
class Manticore
include Base

def initialize(arguments={}, &block)
@request_options = { headers: (arguments.dig(:transport_options, :headers) || {}) }
def initialize(arguments = {}, &block)
@request_options = {
headers: (
arguments.dig(:transport_options, :headers) ||
arguments.dig(:options, :transport_options, :headers) ||
{}
)
}
@manticore = build_client(arguments[:options] || {})
super(arguments, &block)
end

# Should just be run once at startup
def build_client(options={})
def build_client(options = {})
client_options = options[:transport_options] || {}
client_options[:ssl] = options[:ssl] || {}

Expand All @@ -88,22 +94,22 @@ def perform_request(method, path, params={}, body=nil, headers=nil, opts={})

params[:body] = body if body
params[:headers] = headers if headers
params = params.merge @request_options
params = params.merge(@request_options)
case method
when "GET"
when 'GET'
resp = connection.connection.get(url, params)
when "HEAD"
when 'HEAD'
resp = connection.connection.head(url, params)
when "PUT"
when 'PUT'
resp = connection.connection.put(url, params)
when "POST"
when 'POST'
resp = connection.connection.post(url, params)
when "DELETE"
when 'DELETE'
resp = connection.connection.delete(url, params)
else
raise ArgumentError.new "Method #{method} not supported"
end
Response.new resp.code, resp.read_body, resp.headers
Response.new(resp.code, resp.read_body, resp.headers)
end
end

Expand All @@ -113,23 +119,21 @@ def perform_request(method, path, params={}, body=nil, headers=nil, opts={})
# @return [Connections::Collection]
#
def __build_connections
apply_headers(@request_options, options[:transport_options])
apply_headers(@request_options, options)
apply_headers(options)

Connections::Collection.new \
:connections => hosts.map { |host|
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
host[:port] ||= DEFAULT_PORT
Connections::Collection.new(
connections: hosts.map do |host|
host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL
host[:port] ||= DEFAULT_PORT

host.delete(:user) # auth is not supported here.
host.delete(:password) # use the headers

Connections::Connection.new \
:host => host,
:connection => @manticore
},
:selector_class => options[:selector_class],
:selector => options[:selector]
Connections::Connection.new(host: host, connection: @manticore)
end,
selector_class: options[:selector_class],
selector: options[:selector]
)
end

# Closes all connections by marking them as dead
Expand Down Expand Up @@ -157,16 +161,16 @@ def host_unreachable_exceptions

private

def apply_headers(request_options, options)
headers = options&.[](:headers) || {}
def apply_headers(options)
headers = options[:headers] || options.dig(:transport_options, :headers) || {}
headers[CONTENT_TYPE_STR] = find_value(headers, CONTENT_TYPE_REGEX) || DEFAULT_CONTENT_TYPE
headers[USER_AGENT_STR] = find_value(headers, USER_AGENT_REGEX) || user_agent_header
headers[USER_AGENT_STR] = find_value(headers, USER_AGENT_REGEX) || find_value(@request_options[:headers], USER_AGENT_REGEX) || user_agent_header
headers[ACCEPT_ENCODING] = GZIP if use_compression?
request_options[:headers].merge!(headers)
@request_options[:headers].merge!(headers)
end

def user_agent_header
@user_agent ||= begin
@user_agent_header ||= begin
meta = ["RUBY_VERSION: #{JRUBY_VERSION}"]
if RbConfig::CONFIG && RbConfig::CONFIG['host_os']
meta << "#{RbConfig::CONFIG['host_os'].split('_').first[/[a-z]+/i].downcase} #{RbConfig::CONFIG['target_cpu']}"
Expand Down
64 changes: 41 additions & 23 deletions elasticsearch-transport/test/unit/transport_manticore_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,29 +206,8 @@ def common_headers
}

::Manticore::Client.expects(:new).with(options)
transport = Manticore.new hosts: [{ host: 'foobar', port: 1234 }], options: options
end

should 'allow custom headers' do
transport_options = { headers: { 'Authorization' => 'Basic token' } }
transport = Manticore.new(
hosts: [{ host: 'foobar', port: 1234 }],
transport_options: transport_options
)

assert_equal(
transport.instance_variable_get(:@request_options)[:headers]['Authorization'],
'Basic token'
)
transport.connections.first.connection
.expects(:get)
.with do |_host, _options|
assert_equal('Basic token', _options[:headers]['Authorization'])
true
end
.returns(stub_everything)

transport.perform_request('GET', '/', {})
transport = Manticore.new(hosts: [{ host: 'foobar', port: 1234 }], options: options)
assert_equal(transport.options[:ssl][:truststore_password], 'test')
end

should 'pass :transport_options to Manticore::Client' do
Expand All @@ -238,6 +217,45 @@ def common_headers

::Manticore::Client.expects(:new).with(potatoes: 1, ssl: {})
transport = Manticore.new(hosts: [{ host: 'foobar', port: 1234 }], options: options)
assert_equal(transport.options[:transport_options][:potatoes], 1)
end

context 'custom headers' do
should 'allow authorization headers' do
transport_options = { headers: { 'Authorization' => 'Basic token' } }
transport = Manticore.new(
hosts: [{ host: 'foobar', port: 1234 }],
transport_options: transport_options
)

assert_equal(
transport.instance_variable_get(:@request_options)[:headers]['Authorization'],
'Basic token'
)
transport.connections.first.connection.expects(:get).with do |_host, options|
assert_equal('Basic token', options[:headers]['Authorization'])
true
end.returns(stub_everything)
transport.perform_request('GET', '/', {})
end

should 'allow user agent headers' do
transport_options = { headers: { 'User-Agent' => 'Custom UA' } }
transport = Manticore.new(
hosts: [{ host: 'localhost' }],
transport_options: transport_options
)
transport.connections.first.connection.expects(:get).with do |_host, options|
assert_equal('Custom UA', options[:headers]['User-Agent'])
true
end.returns(stub_everything)
transport.perform_request('GET', '/', {})

assert_equal(
transport.instance_variable_get('@request_options')[:headers]['User-Agent'],
'Custom UA'
)
end
end
end
end
Expand Down