Skip to content

Commit 91432a8

Browse files
committed
Updated the local lookup to use prepared statements, closes 50
1 parent fd91efd commit 91432a8

File tree

9 files changed

+283
-26
lines changed

9 files changed

+283
-26
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.1.0
2+
- Added prepared statement support in local lookups [#53](https://github.com/logstash-plugins/logstash-filter-jdbc_static/pull/53)
3+
14
## 1.0.7
25
- Fixed issue with driver verification using Java 11 [#51](https://github.com/logstash-plugins/logstash-filter-jdbc_static/pull/51)
36

docs/index.asciidoc

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,21 +101,21 @@ filter {
101101
local_lookups => [ <3>
102102
{
103103
id => "local-servers"
104-
query => "select descr as description from servers WHERE ip = :ip"
104+
query => "SELECT descr as description FROM servers WHERE ip = :ip"
105105
parameters => {ip => "[from_ip]"}
106106
target => "server"
107107
},
108108
{
109109
id => "local-users"
110-
query => "select firstname, lastname from users WHERE userid = :id"
111-
parameters => {id => "[loggedin_userid]"}
112-
target => "user" <4>
110+
query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?"
111+
prepared_parameters => ["[loggedin_userid]", "[user_nation]"] <4>
112+
target => "user" <5>
113113
}
114114
]
115115
# using add_field here to add & rename values to the event root
116-
add_field => { server_name => "%{[server][0][description]}" }
117-
add_field => { user_firstname => "%{[user][0][firstname]}" } <5>
118-
add_field => { user_lastname => "%{[user][0][lastname]}" } <5>
116+
add_field => { server_name => "%{[server][0][description]}" } <6>
117+
add_field => { user_firstname => "%{[user][0][firstname]}" }
118+
add_field => { user_lastname => "%{[user][0][lastname]}" }
119119
remove_field => ["server", "user"]
120120
staging_directory => "/tmp/logstash/jdbc_static/import_data"
121121
loader_schedule => "* */2 * * *" # run loaders every 2 hours
@@ -134,9 +134,11 @@ structure. The column names and types should match the external database.
134134
The order of table definitions is significant and should match that of the loader queries.
135135
See <<plugins-{type}s-{plugin}-object_order>>.
136136
<3> Performs lookup queries on the local database to enrich the events.
137-
<4> Specifies the event field that will store the looked-up data. If the lookup
137+
<4> Local lookup queries can also use prepared statements where the parameters
138+
follow the positional ordering.
139+
<5> Specifies the event field that will store the looked-up data. If the lookup
138140
returns multiple columns, the data is stored as a JSON object within the field.
139-
<5> Takes data from the JSON object and stores it in top-level event fields for
141+
<6> Takes data from the JSON object and stores it in top-level event fields for
140142
easier analysis in Kibana.
141143

142144
Here's a full example:
@@ -546,7 +548,9 @@ default id is used instead.
546548
query::
547549
A SQL SELECT statement that is executed to achieve the lookup. To use
548550
parameters, use named parameter syntax, for example
549-
`"SELECT * FROM MYTABLE WHERE ID = :id"`.
551+
`"SELECT * FROM MYTABLE WHERE ID = :id"`. Alternatively, the `?` sign
552+
can be used as a prepared statement parameter, in which case
553+
the `prepared_parameters` array is used to populate the values
550554
551555
parameters::
552556
A key/value Hash or dictionary. The key (LHS) is the text that is
@@ -563,6 +567,16 @@ an id and a location, and you have a table of sensors that have a
563567
column of `id-loc_id`. In this case your parameter hash would look
564568
like this: `parameters => { "p1" => "%{[id]}-%{[loc_id]}" }`.
565569
570+
prepared_parameters::
571+
An Array, where the position is related to the position of the `?` in
572+
the query syntax. The values of array follow the same semantic of `parameters`.
573+
If `prepared_parameters` is valorized the filter is forced to use JDBC's
574+
prepared statement to query the local database.
575+
Prepared statements provides two benefits: one on the performance side, because
576+
avoid the DBMS to parse and compile the SQL expression for every call;
577+
the other benefit is on the security side, using prepared statements
578+
avoid SQL-injection attacks based on query string concatenation.
579+
566580
target::
567581
An optional name for the field that will receive the looked-up data.
568582
If you omit this setting then the `id` setting (or the default id) is

lib/logstash/filters/jdbc/lookup.rb

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def initialize(options, globals, default_id)
6363
@valid = false
6464
@option_errors = []
6565
@default_result = nil
66+
@prepared_statement = nil
67+
@symbol_parameters = nil
6668
parse_options
6769
end
6870

@@ -79,8 +81,11 @@ def formatted_errors
7981
end
8082

8183
def enhance(local, event)
82-
result = fetch(local, event) # should return a LookupResult
83-
84+
if @prepared_statement
85+
result = call_prepared(local, event)
86+
else
87+
result = fetch(local, event) # should return a LookupResult
88+
end
8489
if result.failed? || result.parameters_invalid?
8590
tag_failure(event)
8691
end
@@ -98,6 +103,17 @@ def enhance(local, event)
98103
end
99104
end
100105

106+
def use_prepared_statement?
107+
@prepared_parameters && !@prepared_parameters.empty?
108+
end
109+
110+
def prepare(local)
111+
hash = {}
112+
@prepared_parameters.each_with_index { |v, i| hash[:"$p#{i}"] = v }
113+
@prepared_param_placeholder_map = hash
114+
@prepared_statement = local.prepare(query, hash.keys)
115+
end
116+
101117
private
102118

103119
def tag_failure(event)
@@ -139,6 +155,33 @@ def fetch(local, event)
139155
result
140156
end
141157

158+
def call_prepared(local, event)
159+
result = LookupResult.new()
160+
if @parameters_specified
161+
params = prepare_parameters_from_event(event, result)
162+
if result.parameters_invalid?
163+
logger.warn? && logger.warn("Parameter field not found in event", :lookup_id => @id, :invalid_parameters => result.invalid_parameters)
164+
return result
165+
end
166+
else
167+
params = {}
168+
end
169+
begin
170+
logger.debug? && logger.debug("Executing Jdbc query", :lookup_id => @id, :statement => query, :parameters => params)
171+
@prepared_statement.call(params).each do |row|
172+
stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
173+
result.push(stringified)
174+
end
175+
rescue ::Sequel::Error => e
176+
# all sequel errors are a subclass of this, let all other standard or runtime errors bubble up
177+
result.failed!
178+
logger.warn? && logger.warn("Exception when executing Jdbc query", :lookup_id => @id, :exception => e.message, :backtrace => e.backtrace.take(8))
179+
end
180+
# if either of: no records or a Sequel exception occurs the payload is
181+
# empty and the default can be substituted later.
182+
result
183+
end
184+
142185
def process_event(event, result)
143186
# use deep clone here so other filter function don't taint the payload by reference
144187
event.set(@target, ::LogStash::Util.deep_clone(result.payload))
@@ -162,17 +205,34 @@ def parse_options
162205
@option_errors << "The options for '#{@id}' must include a 'query' string"
163206
end
164207

165-
@parameters = @options["parameters"]
166-
@parameters_specified = false
167-
if @parameters
168-
if !@parameters.is_a?(Hash)
169-
@option_errors << "The 'parameters' option for '#{@id}' must be a Hash"
170-
else
171-
# this is done once per lookup at start, i.e. Sprintfier.new et.al is done once.
172-
@symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash }
173-
# the user might specify an empty hash parameters => {}
174-
# maybe due to an unparameterised query
175-
@parameters_specified = !@symbol_parameters.empty?
208+
if @options["parameters"] && @options["prepared_parameters"]
209+
@option_errors << "Can't specify 'parameters' and 'prepared_parameters' in the same lookup"
210+
else
211+
@parameters = @options["parameters"]
212+
@prepared_parameters = @options["prepared_parameters"]
213+
@parameters_specified = false
214+
if @parameters
215+
if !@parameters.is_a?(Hash)
216+
@option_errors << "The 'parameters' option for '#{@id}' must be a Hash"
217+
else
218+
# this is done once per lookup at start, i.e. Sprintfier.new et.al is done once.
219+
@symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash }
220+
# the user might specify an empty hash parameters => {}
221+
# maybe due to an unparameterised query
222+
@parameters_specified = !@symbol_parameters.empty?
223+
end
224+
elsif @prepared_parameters
225+
if !@prepared_parameters.is_a?(Array)
226+
@option_errors << "The 'prepared_parameters' option for '#{@id}' must be an Array"
227+
elsif @query.count("?") != @prepared_parameters.size
228+
@option_errors << "The 'prepared_parameters' option for '#{@id}' doesn't match count with query's placeholder"
229+
else
230+
#prepare the map @symbol_parameters :n => sprintf_or_get
231+
hash = {}
232+
@prepared_parameters.each_with_index {|v,i| hash[:"p#{i}"] = sprintf_or_get(v)}
233+
@symbol_parameters = hash
234+
@parameters_specified = !@prepared_parameters.empty?
235+
end
176236
end
177237
end
178238

lib/logstash/filters/jdbc/lookup_processor.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ def initialize(lookups_array, globals)
3838
"lookup_jdbc_driver_class",
3939
"lookup_jdbc_driver_library").compact)
4040
@local.connect(CONNECTION_ERROR_MSG)
41+
42+
create_prepared_statements_for_lookups
4143
end
4244
end
4345

@@ -60,6 +62,14 @@ def valid?
6062

6163
private
6264

65+
def create_prepared_statements_for_lookups()
66+
@lookups.each do |lookup|
67+
if lookup.use_prepared_statement?
68+
lookup.prepare(@local)
69+
end
70+
end
71+
end
72+
6373
def validate_lookups(lookups_errors = [])
6474
ids = Hash.new(0)
6575
errors = []

lib/logstash/filters/jdbc/read_write_database.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ def fetch(statement, parameters)
2727
@rwlock.readLock().unlock()
2828
end
2929

30+
def prepare(statement, parameters)
31+
@rwlock.readLock().lock()
32+
@db[statement, parameters].prepare(:select, @id)
33+
ensure
34+
@rwlock.readLock().unlock()
35+
end
36+
3037
def build_db_object(db_object)
3138
begin
3239
@rwlock.writeLock().lock()

lib/logstash/filters/jdbc_static.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,19 @@ module LogStash module Filters class JdbcStatic < LogStash::Filters::Base
6060
# For example:
6161
# local_lookups => [
6262
# {
63-
# "query" => "select * from country WHERE code = :code",
63+
# "query" => "SELECT * FROM country WHERE code = :code",
6464
# "parameters" => {"code" => "country_code"}
6565
# "target" => "country_details"
6666
# },
6767
# {
68-
# "query" => "select ip, name from servers WHERE ip LIKE :ip",
68+
# "query" => "SELECT ip, name FROM servers WHERE ip LIKE :ip",
6969
# "parameters" => {"ip" => "%{[response][ip]}%"}
7070
# "target" => "servers"
71+
# },
72+
# {
73+
# "query" => "SELECT ip, name FROM servers WHERE ip = ?",
74+
# "prepared_parameters" => ["from_ip"]
75+
# "target" => "servers"
7176
# }
7277
# ]
7378
config :local_lookups, :required => true, :validate => [LogStash::Filters::Jdbc::LookupProcessor]

logstash-filter-jdbc_static.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-filter-jdbc_static'
3-
s.version = '1.0.7'
3+
s.version = '1.1.0'
44
s.licenses = ['Apache-2.0']
55
s.summary = "This filter executes a SQL query to fetch a SQL query result, store it locally then use a second SQL query to update an event."
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/filters/integration/jdbc_static_spec.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,40 @@ module LogStash module Filters
8383
end
8484
end
8585

86+
context "under normal conditions with prepared statement" do
87+
let(:lookup_statement) { "SELECT * FROM servers WHERE ip LIKE ?" }
88+
let(:settings) do
89+
{
90+
"jdbc_user" => ENV['USER'],
91+
"jdbc_driver_class" => "org.postgresql.Driver",
92+
"jdbc_driver_library" => "/usr/share/logstash/postgresql.jar",
93+
"staging_directory" => temp_import_path_plugin,
94+
"jdbc_connection_string" => jdbc_connection_string,
95+
"loaders" => [
96+
{
97+
"id" =>"servers",
98+
"query" => loader_statement,
99+
"local_table" => "servers"
100+
}
101+
],
102+
"local_db_objects" => local_db_objects,
103+
"local_lookups" => [
104+
{
105+
"query" => lookup_statement,
106+
"prepared_parameters" => [parameters_rhs],
107+
"target" => "server"
108+
}
109+
]
110+
}
111+
end
112+
113+
it "enhances an event" do
114+
plugin.register
115+
plugin.filter(event)
116+
expect(event.get("server")).to eq([{"ip"=>"10.3.1.1", "name"=>"mv-server-1", "location"=>"MV-9-6-4"}])
117+
end
118+
end
119+
86120
context "under normal conditions when index_columns is not specified" do
87121
let(:local_db_objects) do
88122
[

0 commit comments

Comments
 (0)