diff --git a/CHANGELOG.md b/CHANGELOG.md index be256ec..845f303 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 1.1.0 + - Added prepared statement support in local lookups [#53](https://github.com/logstash-plugins/logstash-filter-jdbc_static/pull/53) + ## 1.0.7 - Fixed issue with driver verification using Java 11 [#51](https://github.com/logstash-plugins/logstash-filter-jdbc_static/pull/51) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f53305d..3e07e97 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -101,21 +101,21 @@ filter { local_lookups => [ <3> { id => "local-servers" - query => "select descr as description from servers WHERE ip = :ip" + query => "SELECT descr as description FROM servers WHERE ip = :ip" parameters => {ip => "[from_ip]"} target => "server" }, { id => "local-users" - query => "select firstname, lastname from users WHERE userid = :id" - parameters => {id => "[loggedin_userid]"} - target => "user" <4> + query => "SELECT firstname, lastname FROM users WHERE userid = ? AND country = ?" + prepared_parameters => ["[loggedin_userid]", "[user_nation]"] <4> + target => "user" <5> } ] # using add_field here to add & rename values to the event root - add_field => { server_name => "%{[server][0][description]}" } - add_field => { user_firstname => "%{[user][0][firstname]}" } <5> - add_field => { user_lastname => "%{[user][0][lastname]}" } <5> + add_field => { server_name => "%{[server][0][description]}" } <6> + add_field => { user_firstname => "%{[user][0][firstname]}" } + add_field => { user_lastname => "%{[user][0][lastname]}" } remove_field => ["server", "user"] staging_directory => "/tmp/logstash/jdbc_static/import_data" loader_schedule => "* */2 * * *" # run loaders every 2 hours @@ -134,9 +134,11 @@ structure. The column names and types should match the external database. The order of table definitions is significant and should match that of the loader queries. See <>. <3> Performs lookup queries on the local database to enrich the events. -<4> Specifies the event field that will store the looked-up data. If the lookup +<4> Local lookup queries can also use prepared statements where the parameters +follow the positional ordering. +<5> Specifies the event field that will store the looked-up data. If the lookup returns multiple columns, the data is stored as a JSON object within the field. -<5> Takes data from the JSON object and stores it in top-level event fields for +<6> Takes data from the JSON object and stores it in top-level event fields for easier analysis in Kibana. Here's a full example: @@ -546,7 +548,9 @@ default id is used instead. query:: A SQL SELECT statement that is executed to achieve the lookup. To use parameters, use named parameter syntax, for example -`"SELECT * FROM MYTABLE WHERE ID = :id"`. +`"SELECT * FROM MYTABLE WHERE ID = :id"`. Alternatively, the `?` sign +can be used as a prepared statement parameter, in which case +the `prepared_parameters` array is used to populate the values parameters:: A key/value Hash or dictionary. The key (LHS) is the text that is @@ -563,6 +567,16 @@ an id and a location, and you have a table of sensors that have a column of `id-loc_id`. In this case your parameter hash would look like this: `parameters => { "p1" => "%{[id]}-%{[loc_id]}" }`. +prepared_parameters:: +An Array, where the position is related to the position of the `?` in +the query syntax. The values of array follow the same semantic of `parameters`. +If `prepared_parameters` is valorized the filter is forced to use JDBC's +prepared statement to query the local database. +Prepared statements provides two benefits: one on the performance side, because +avoid the DBMS to parse and compile the SQL expression for every call; +the other benefit is on the security side, using prepared statements +avoid SQL-injection attacks based on query string concatenation. + target:: An optional name for the field that will receive the looked-up data. If you omit this setting then the `id` setting (or the default id) is diff --git a/lib/logstash/filters/jdbc/lookup.rb b/lib/logstash/filters/jdbc/lookup.rb index 7b1eebe..f3cd0bb 100644 --- a/lib/logstash/filters/jdbc/lookup.rb +++ b/lib/logstash/filters/jdbc/lookup.rb @@ -63,6 +63,8 @@ def initialize(options, globals, default_id) @valid = false @option_errors = [] @default_result = nil + @prepared_statement = nil + @symbol_parameters = nil parse_options end @@ -79,8 +81,11 @@ def formatted_errors end def enhance(local, event) - result = fetch(local, event) # should return a LookupResult - + if @prepared_statement + result = call_prepared(local, event) + else + result = fetch(local, event) # should return a LookupResult + end if result.failed? || result.parameters_invalid? tag_failure(event) end @@ -98,6 +103,17 @@ def enhance(local, event) end end + def use_prepared_statement? + @prepared_parameters && !@prepared_parameters.empty? + end + + def prepare(local) + hash = {} + @prepared_parameters.each_with_index { |v, i| hash[:"$p#{i}"] = v } + @prepared_param_placeholder_map = hash + @prepared_statement = local.prepare(query, hash.keys) + end + private def tag_failure(event) @@ -139,6 +155,33 @@ def fetch(local, event) result end + def call_prepared(local, event) + result = LookupResult.new() + if @parameters_specified + params = prepare_parameters_from_event(event, result) + if result.parameters_invalid? + logger.warn? && logger.warn("Parameter field not found in event", :lookup_id => @id, :invalid_parameters => result.invalid_parameters) + return result + end + else + params = {} + end + begin + logger.debug? && logger.debug("Executing Jdbc query", :lookup_id => @id, :statement => query, :parameters => params) + @prepared_statement.call(params).each do |row| + stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys + result.push(stringified) + end + rescue ::Sequel::Error => e + # all sequel errors are a subclass of this, let all other standard or runtime errors bubble up + result.failed! + logger.warn? && logger.warn("Exception when executing Jdbc query", :lookup_id => @id, :exception => e.message, :backtrace => e.backtrace.take(8)) + end + # if either of: no records or a Sequel exception occurs the payload is + # empty and the default can be substituted later. + result + end + def process_event(event, result) # use deep clone here so other filter function don't taint the payload by reference event.set(@target, ::LogStash::Util.deep_clone(result.payload)) @@ -162,17 +205,34 @@ def parse_options @option_errors << "The options for '#{@id}' must include a 'query' string" end - @parameters = @options["parameters"] - @parameters_specified = false - if @parameters - if !@parameters.is_a?(Hash) - @option_errors << "The 'parameters' option for '#{@id}' must be a Hash" - else - # this is done once per lookup at start, i.e. Sprintfier.new et.al is done once. - @symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash } - # the user might specify an empty hash parameters => {} - # maybe due to an unparameterised query - @parameters_specified = !@symbol_parameters.empty? + if @options["parameters"] && @options["prepared_parameters"] + @option_errors << "Can't specify 'parameters' and 'prepared_parameters' in the same lookup" + else + @parameters = @options["parameters"] + @prepared_parameters = @options["prepared_parameters"] + @parameters_specified = false + if @parameters + if !@parameters.is_a?(Hash) + @option_errors << "The 'parameters' option for '#{@id}' must be a Hash" + else + # this is done once per lookup at start, i.e. Sprintfier.new et.al is done once. + @symbol_parameters = @parameters.inject({}) {|hash,(k,v)| hash[k.to_sym] = sprintf_or_get(v) ; hash } + # the user might specify an empty hash parameters => {} + # maybe due to an unparameterised query + @parameters_specified = !@symbol_parameters.empty? + end + elsif @prepared_parameters + if !@prepared_parameters.is_a?(Array) + @option_errors << "The 'prepared_parameters' option for '#{@id}' must be an Array" + elsif @query.count("?") != @prepared_parameters.size + @option_errors << "The 'prepared_parameters' option for '#{@id}' doesn't match count with query's placeholder" + else + #prepare the map @symbol_parameters :n => sprintf_or_get + hash = {} + @prepared_parameters.each_with_index {|v,i| hash[:"p#{i}"] = sprintf_or_get(v)} + @symbol_parameters = hash + @parameters_specified = !@prepared_parameters.empty? + end end end diff --git a/lib/logstash/filters/jdbc/lookup_processor.rb b/lib/logstash/filters/jdbc/lookup_processor.rb index d083d64..2ac2c9d 100644 --- a/lib/logstash/filters/jdbc/lookup_processor.rb +++ b/lib/logstash/filters/jdbc/lookup_processor.rb @@ -38,6 +38,8 @@ def initialize(lookups_array, globals) "lookup_jdbc_driver_class", "lookup_jdbc_driver_library").compact) @local.connect(CONNECTION_ERROR_MSG) + + create_prepared_statements_for_lookups end end @@ -60,6 +62,14 @@ def valid? private + def create_prepared_statements_for_lookups() + @lookups.each do |lookup| + if lookup.use_prepared_statement? + lookup.prepare(@local) + end + end + end + def validate_lookups(lookups_errors = []) ids = Hash.new(0) errors = [] diff --git a/lib/logstash/filters/jdbc/read_write_database.rb b/lib/logstash/filters/jdbc/read_write_database.rb index 78bc641..23a4eee 100644 --- a/lib/logstash/filters/jdbc/read_write_database.rb +++ b/lib/logstash/filters/jdbc/read_write_database.rb @@ -27,6 +27,13 @@ def fetch(statement, parameters) @rwlock.readLock().unlock() end + def prepare(statement, parameters) + @rwlock.readLock().lock() + @db[statement, parameters].prepare(:select, @id) + ensure + @rwlock.readLock().unlock() + end + def build_db_object(db_object) begin @rwlock.writeLock().lock() diff --git a/lib/logstash/filters/jdbc_static.rb b/lib/logstash/filters/jdbc_static.rb index 4ae9266..3d3bfb8 100644 --- a/lib/logstash/filters/jdbc_static.rb +++ b/lib/logstash/filters/jdbc_static.rb @@ -60,14 +60,19 @@ module LogStash module Filters class JdbcStatic < LogStash::Filters::Base # For example: # local_lookups => [ # { - # "query" => "select * from country WHERE code = :code", + # "query" => "SELECT * FROM country WHERE code = :code", # "parameters" => {"code" => "country_code"} # "target" => "country_details" # }, # { - # "query" => "select ip, name from servers WHERE ip LIKE :ip", + # "query" => "SELECT ip, name FROM servers WHERE ip LIKE :ip", # "parameters" => {"ip" => "%{[response][ip]}%"} # "target" => "servers" + # }, + # { + # "query" => "SELECT ip, name FROM servers WHERE ip = ?", + # "prepared_parameters" => ["from_ip"] + # "target" => "servers" # } # ] config :local_lookups, :required => true, :validate => [LogStash::Filters::Jdbc::LookupProcessor] diff --git a/logstash-filter-jdbc_static.gemspec b/logstash-filter-jdbc_static.gemspec index 6d8cf15..9b9305b 100644 --- a/logstash-filter-jdbc_static.gemspec +++ b/logstash-filter-jdbc_static.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-filter-jdbc_static' - s.version = '1.0.7' + s.version = '1.1.0' s.licenses = ['Apache-2.0'] s.summary = "This filter executes a SQL query to fetch a SQL query result, store it locally then use a second SQL query to update an event." s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/filters/integration/jdbc_static_spec.rb b/spec/filters/integration/jdbc_static_spec.rb index 9f555b8..17a128d 100644 --- a/spec/filters/integration/jdbc_static_spec.rb +++ b/spec/filters/integration/jdbc_static_spec.rb @@ -83,6 +83,40 @@ module LogStash module Filters end end + context "under normal conditions with prepared statement" do + let(:lookup_statement) { "SELECT * FROM servers WHERE ip LIKE ?" } + let(:settings) do + { + "jdbc_user" => ENV['USER'], + "jdbc_driver_class" => "org.postgresql.Driver", + "jdbc_driver_library" => "/usr/share/logstash/postgresql.jar", + "staging_directory" => temp_import_path_plugin, + "jdbc_connection_string" => jdbc_connection_string, + "loaders" => [ + { + "id" =>"servers", + "query" => loader_statement, + "local_table" => "servers" + } + ], + "local_db_objects" => local_db_objects, + "local_lookups" => [ + { + "query" => lookup_statement, + "prepared_parameters" => [parameters_rhs], + "target" => "server" + } + ] + } + end + + it "enhances an event" do + plugin.register + plugin.filter(event) + expect(event.get("server")).to eq([{"ip"=>"10.3.1.1", "name"=>"mv-server-1", "location"=>"MV-9-6-4"}]) + end + end + context "under normal conditions when index_columns is not specified" do let(:local_db_objects) do [ diff --git a/spec/filters/jdbc/lookup_spec.rb b/spec/filters/jdbc/lookup_spec.rb index 6abf312..67ddda9 100644 --- a/spec/filters/jdbc/lookup_spec.rb +++ b/spec/filters/jdbc/lookup_spec.rb @@ -44,6 +44,27 @@ module LogStash module Filters module Jdbc result = described_class.find_validation_errors([lookup_hash]) expect(result).to eq("The 'parameters' option for 'lookup-1' must be a Hash") end + + it "parameters and prepared_parameters are defined at same time" do + lookup_hash = { + "query" => "SELECT * FROM table WHERE ip=?", + "parameters" => {"ip" => "%%{[ip]}"}, + "prepared_parameters" => ["%%{[ip]}"], + "target" => "server" + } + result = described_class.find_validation_errors([lookup_hash]) + expect(result).to eq("Can't specify 'parameters' and 'prepared_parameters' in the same lookup") + end + + it "prepared_parameters count doesn't match the number of '?' in the query" do + lookup_hash = { + "query" => "SELECT * FROM table WHERE ip=? AND host=?", + "prepared_parameters" => ["%%{[ip]}"], + "target" => "server" + } + result = described_class.find_validation_errors([lookup_hash]) + expect(result).to eq("The 'prepared_parameters' option for 'lookup-1' doesn't match count with query's placeholder") + end end context "when supplied with a valid arg" do @@ -124,6 +145,109 @@ module LogStash module Filters module Jdbc expect(event.get("server")).to eq(records) end end + + describe "lookup operations with prepared statement" do + let(:local_db) { double("local_db") } + let(:lookup_hash) do + { + "query" => "select * from servers WHERE ip LIKE ?", + "prepared_parameters" => ["%%{[ip]}"], + "target" => "server", + "tag_on_failure" => ["_jdbcstaticfailure_server"] + } + end + let(:event) { LogStash::Event.new()} + let(:records) { [{"name" => "ldn-1-23", "rack" => "2:1:6"}] } + let(:prepared_statement) { double("prepared_statement")} + + subject(:lookup) { described_class.new(lookup_hash, {}, "lookup-1") } + + before(:each) do + allow(local_db).to receive(:prepare).once.and_return(prepared_statement) + allow(prepared_statement).to receive(:call).once.and_return(records) + end + + it "should be valid" do + expect(subject.valid?).to be_truthy + end + + it "should have no formatted_errors" do + expect(subject.formatted_errors).to eq("") + end + + it "should enhance an event" do + event.set("ip", "20.20") + subject.prepare(local_db) + subject.enhance(local_db, event) + expect(event.get("tags")).to be_nil + expect(event.get("server")).to eq(records) + end + end + + describe "lookup operations with prepared statement multiple parameters" do + let(:local_db) { double("local_db") } + let(:lookup_hash) do + { + "query" => "select * from servers WHERE ip LIKE ? AND os LIKE ?", + "prepared_parameters" => ["%%{[ip]}", "os"], + "target" => "server", + "tag_on_failure" => ["_jdbcstaticfailure_server"] + } + end + let(:event) { LogStash::Event.new()} + let(:records) { [{"name" => "ldn-1-23", "rack" => "2:1:6"}] } + let(:prepared_statement) { double("prepared_statement")} + + subject(:lookup) { described_class.new(lookup_hash, {}, "lookup-1") } + + before(:each) do + allow(local_db).to receive(:prepare).once.and_return(prepared_statement) + allow(prepared_statement).to receive(:call).once.and_return(records) + end + + it "should be valid" do + expect(subject.valid?).to be_truthy + end + + it "should have no formatted_errors" do + expect(subject.formatted_errors).to eq("") + end + + it "should enhance an event" do + event.set("ip", "20.20") + event.set("os", "MacOS") + subject.prepare(local_db) + subject.enhance(local_db, event) + expect(event.get("tags")).to be_nil + expect(event.get("server")).to eq(records) + end + end + + describe "lookup operations with badly configured prepared statement" do + let(:local_db) { double("local_db") } + let(:lookup_hash) do + { + "query" => "select * from servers WHERE ip LIKE ? AND os LIKE ?", + "prepared_parameters" => ["%%{[ip]}"], + "target" => "server", + "tag_on_failure" => ["_jdbcstaticfailure_server"] + } + end + let(:event) { LogStash::Event.new()} + let(:records) { [{"name" => "ldn-1-23", "rack" => "2:1:6"}] } + let(:prepared_statement) { double("prepared_statement")} + + subject(:lookup) { described_class.new(lookup_hash, {}, "lookup-1") } + + before(:each) do + allow(local_db).to receive(:prepare).once.and_return(prepared_statement) + allow(prepared_statement).to receive(:call).once.and_return(records) + end + + it "must not be valid" do + expect(subject.valid?).to be_falsey + end + end end end end end