Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.1.0
- Added prepared statement support in local lookups [#53](https://github.com/logstash-plugins/logstash-filter-jdbc_static/pull/53)

## 1.0.7
- Fixed issue with driver verification using Java 11 [#51](https://github.com/logstash-plugins/logstash-filter-jdbc_static/pull/51)

Expand Down
34 changes: 24 additions & 10 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,21 @@ filter {
local_lookups => [ <3>
{
id => "local-servers"
query => "select descr as description from servers WHERE ip = :ip"
query => "SELECT descr as description FROM servers WHERE ip = :ip"
parameters => {ip => "[from_ip]"}
target => "server"
},
{
id => "local-users"
query => "select firstname, lastname from users WHERE userid = :id"
parameters => {id => "[loggedin_userid]"}
target => "user" <4>
query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?"
prepared_parameters => ["[loggedin_userid]", "[user_nation]"] <4>
target => "user" <5>
}
]
# using add_field here to add & rename values to the event root
add_field => { server_name => "%{[server][0][description]}" }
add_field => { user_firstname => "%{[user][0][firstname]}" } <5>
add_field => { user_lastname => "%{[user][0][lastname]}" } <5>
add_field => { server_name => "%{[server][0][description]}" } <6>
add_field => { user_firstname => "%{[user][0][firstname]}" }
add_field => { user_lastname => "%{[user][0][lastname]}" }
remove_field => ["server", "user"]
staging_directory => "/tmp/logstash/jdbc_static/import_data"
loader_schedule => "* */2 * * *" # run loaders every 2 hours
Expand All @@ -134,9 +134,11 @@ structure. The column names and types should match the external database.
The order of table definitions is significant and should match that of the loader queries.
See <<plugins-{type}s-{plugin}-object_order>>.
<3> Performs lookup queries on the local database to enrich the events.
<4> Specifies the event field that will store the looked-up data. If the lookup
<4> Local lookup queries can also use prepared statements where the parameters
follow the positional ordering.
<5> Specifies the event field that will store the looked-up data. If the lookup
returns multiple columns, the data is stored as a JSON object within the field.
<5> Takes data from the JSON object and stores it in top-level event fields for
<6> Takes data from the JSON object and stores it in top-level event fields for
easier analysis in Kibana.

Here's a full example:
Expand Down Expand Up @@ -546,7 +548,9 @@ default id is used instead.
query::
A SQL SELECT statement that is executed to achieve the lookup. To use
parameters, use named parameter syntax, for example
`"SELECT * FROM MYTABLE WHERE ID = :id"`.
`"SELECT * FROM MYTABLE WHERE ID = :id"`. Alternatively, the `?` sign
can be used as a prepared statement parameter, in which case
the `prepared_parameters` array is used to populate the values

parameters::
A key/value Hash or dictionary. The key (LHS) is the text that is
Expand All @@ -563,6 +567,16 @@ an id and a location, and you have a table of sensors that have a
column of `id-loc_id`. In this case your parameter hash would look
like this: `parameters => { "p1" => "%{[id]}-%{[loc_id]}" }`.

prepared_parameters::
An Array, where the position is related to the position of the `?` in
the query syntax. The values of array follow the same semantic of `parameters`.
If `prepared_parameters` is valorized the filter is forced to use JDBC's
prepared statement to query the local database.
Prepared statements provides two benefits: one on the performance side, because
avoid the DBMS to parse and compile the SQL expression for every call;
the other benefit is on the security side, using prepared statements
avoid SQL-injection attacks based on query string concatenation.

target::
An optional name for the field that will receive the looked-up data.
If you omit this setting then the `id` setting (or the default id) is
Expand Down
86 changes: 73 additions & 13 deletions lib/logstash/filters/jdbc/lookup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def initialize(options, globals, default_id)
@valid = false
@option_errors = []
@default_result = nil
@prepared_statement = nil
@symbol_parameters = nil
parse_options
end

Expand All @@ -79,8 +81,11 @@ def formatted_errors
end

def enhance(local, event)
result = fetch(local, event) # should return a LookupResult

if @prepared_statement
result = call_prepared(local, event)
else
result = fetch(local, event) # should return a LookupResult
end
if result.failed? || result.parameters_invalid?
tag_failure(event)
end
Expand All @@ -98,6 +103,17 @@ def enhance(local, event)
end
end

def use_prepared_statement?
@prepared_parameters && !@prepared_parameters.empty?
end

def prepare(local)
hash = {}
@prepared_parameters.each_with_index { |v, i| hash[:"$p#{i}"] = v }
@prepared_param_placeholder_map = hash
@prepared_statement = local.prepare(query, hash.keys)
end

private

def tag_failure(event)
Expand Down Expand Up @@ -139,6 +155,33 @@ def fetch(local, event)
result
end

def call_prepared(local, event)
result = LookupResult.new()
if @parameters_specified
params = prepare_parameters_from_event(event, result)
if result.parameters_invalid?
logger.warn? && logger.warn("Parameter field not found in event", :lookup_id => @id, :invalid_parameters => result.invalid_parameters)
return result
end
else
params = {}
end
begin
logger.debug? && logger.debug("Executing Jdbc query", :lookup_id => @id, :statement => query, :parameters => params)
@prepared_statement.call(params).each do |row|
stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: why do we need to stringify (just curious) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've taken suggestion from the no prepared part, I think the stringify is need to avoid errors of type conversion error when the JDBC driver loads a BigInteger or the driver does ugly conversions.

result.push(stringified)
end
rescue ::Sequel::Error => e
# all sequel errors are a subclass of this, let all other standard or runtime errors bubble up
result.failed!
logger.warn? && logger.warn("Exception when executing Jdbc query", :lookup_id => @id, :exception => e.message, :backtrace => e.backtrace.take(8))
end
# if either of: no records or a Sequel exception occurs the payload is
# empty and the default can be substituted later.
result
end

def process_event(event, result)
# use deep clone here so other filter function don't taint the payload by reference
event.set(@target, ::LogStash::Util.deep_clone(result.payload))
Expand All @@ -162,17 +205,34 @@ def parse_options
@option_errors << "The options for '#{@id}' must include a 'query' string"
end

@parameters = @options["parameters"]
@parameters_specified = false
if @parameters
if [email protected]_a?(Hash)
@option_errors << "The 'parameters' option for '#{@id}' must be a Hash"
else
# this is done once per lookup at start, i.e. Sprintfier.new et.al is done once.
@symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash }
# the user might specify an empty hash parameters => {}
# maybe due to an unparameterised query
@parameters_specified = !@symbol_parameters.empty?
if @options["parameters"] && @options["prepared_parameters"]
@option_errors << "Can't specify 'parameters' and 'prepared_parameters' in the same lookup"
else
@parameters = @options["parameters"]
@prepared_parameters = @options["prepared_parameters"]
@parameters_specified = false
if @parameters
if [email protected]_a?(Hash)
@option_errors << "The 'parameters' option for '#{@id}' must be a Hash"
else
# this is done once per lookup at start, i.e. Sprintfier.new et.al is done once.
@symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash }
# the user might specify an empty hash parameters => {}
# maybe due to an unparameterised query
@parameters_specified = !@symbol_parameters.empty?
end
elsif @prepared_parameters
if !@prepared_parameters.is_a?(Array)
@option_errors << "The 'prepared_parameters' option for '#{@id}' must be an Array"
elsif @query.count("?") != @prepared_parameters.size
@option_errors << "The 'prepared_parameters' option for '#{@id}' doesn't match count with query's placeholder"
else
#prepare the map @symbol_parameters :n => sprintf_or_get
hash = {}
@prepared_parameters.each_with_index {|v,i| hash[:"p#{i}"] = sprintf_or_get(v)}
@symbol_parameters = hash
@parameters_specified = !@prepared_parameters.empty?
end
end
end

Expand Down
10 changes: 10 additions & 0 deletions lib/logstash/filters/jdbc/lookup_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def initialize(lookups_array, globals)
"lookup_jdbc_driver_class",
"lookup_jdbc_driver_library").compact)
@local.connect(CONNECTION_ERROR_MSG)

create_prepared_statements_for_lookups
end
end

Expand All @@ -60,6 +62,14 @@ def valid?

private

def create_prepared_statements_for_lookups()
@lookups.each do |lookup|
if lookup.use_prepared_statement?
lookup.prepare(@local)
end
end
end

def validate_lookups(lookups_errors = [])
ids = Hash.new(0)
errors = []
Expand Down
7 changes: 7 additions & 0 deletions lib/logstash/filters/jdbc/read_write_database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def fetch(statement, parameters)
@rwlock.readLock().unlock()
end

def prepare(statement, parameters)
@rwlock.readLock().lock()
@db[statement, parameters].prepare(:select, @id)
ensure
@rwlock.readLock().unlock()
end

def build_db_object(db_object)
begin
@rwlock.writeLock().lock()
Expand Down
9 changes: 7 additions & 2 deletions lib/logstash/filters/jdbc_static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ module LogStash module Filters class JdbcStatic < LogStash::Filters::Base
# For example:
# local_lookups => [
# {
# "query" => "select * from country WHERE code = :code",
# "query" => "SELECT * FROM country WHERE code = :code",
# "parameters" => {"code" => "country_code"}
# "target" => "country_details"
# },
# {
# "query" => "select ip, name from servers WHERE ip LIKE :ip",
# "query" => "SELECT ip, name FROM servers WHERE ip LIKE :ip",
# "parameters" => {"ip" => "%{[response][ip]}%"}
# "target" => "servers"
# },
# {
# "query" => "SELECT ip, name FROM servers WHERE ip = ?",
# "prepared_parameters" => ["from_ip"]
# "target" => "servers"
# }
# ]
config :local_lookups, :required => true, :validate => [LogStash::Filters::Jdbc::LookupProcessor]
Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-jdbc_static.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-filter-jdbc_static'
s.version = '1.0.7'
s.version = '1.1.0'
s.licenses = ['Apache-2.0']
s.summary = "This filter executes a SQL query to fetch a SQL query result, store it locally then use a second SQL query to update an event."
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
34 changes: 34 additions & 0 deletions spec/filters/integration/jdbc_static_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,40 @@ module LogStash module Filters
end
end

context "under normal conditions with prepared statement" do
let(:lookup_statement) { "SELECT * FROM servers WHERE ip LIKE ?" }
let(:settings) do
{
"jdbc_user" => ENV['USER'],
"jdbc_driver_class" => "org.postgresql.Driver",
"jdbc_driver_library" => "/usr/share/logstash/postgresql.jar",
"staging_directory" => temp_import_path_plugin,
"jdbc_connection_string" => jdbc_connection_string,
"loaders" => [
{
"id" =>"servers",
"query" => loader_statement,
"local_table" => "servers"
}
],
"local_db_objects" => local_db_objects,
"local_lookups" => [
{
"query" => lookup_statement,
"prepared_parameters" => [parameters_rhs],
"target" => "server"
}
]
}
end

it "enhances an event" do
plugin.register
plugin.filter(event)
expect(event.get("server")).to eq([{"ip"=>"10.3.1.1", "name"=>"mv-server-1", "location"=>"MV-9-6-4"}])
end
end

context "under normal conditions when index_columns is not specified" do
let(:local_db_objects) do
[
Expand Down
Loading