lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.14 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.16

- old
+ new

@@ -199,10 +199,16 @@ # } # ------------------------------------------------------- # this will only convert column0 that has ISO-8859-1 as an original encoding. config :columns_charset, :validate => :hash, :default => {} + config :use_prepared_statements, :validate => :boolean, :default => false + + config :prepared_statement_name, :validate => :string, :default => "" + + config :prepared_statement_bind_values, :validate => :array, :default => [] + attr_reader :database # for test mocking/stubbing public def register @@ -215,21 +221,29 @@ if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end end - set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) - set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) - - @enable_encoding = !@charset.nil? || !@columns_charset.empty? - unless @statement.nil? ^ @statement_filepath.nil? raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end @statement = ::File.read(@statement_filepath) if @statement_filepath + # must validate prepared statement mode after trying to read in from @statement_filepath + if @use_prepared_statements + validation_errors = validate_prepared_statement_mode + unless validation_errors.empty? + raise(LogStash::ConfigurationError, "Prepared Statement Mode validation errors: " + validation_errors.join(", ")) + end + end + + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) + + @enable_encoding = !@charset.nil? || !@columns_charset.empty? + if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") end @jdbc_password = LogStash::Util::Password.new(::File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath @@ -246,11 +260,11 @@ end end # def register # test injection points def set_statement_logger(instance) - @statement_logger = instance + @statement_handler = LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self, instance) end def set_value_tracker(instance) @value_tracker = instance end @@ -273,13 +287,27 @@ @scheduler.shutdown(:wait) if @scheduler end private + def validate_prepared_statement_mode + error_messages = [] + if @prepared_statement_name.empty? + error_messages << "must provide a name for the Prepared Statement, it must be unique for the db session" + end + if @statement.count("?") != @prepared_statement_bind_values.size + # mismatch in number of bind value elements to placeholder characters + error_messages << "there is a mismatch between the number of statement `?` placeholders and :prepared_statement_bind_values array setting elements" + end + if @jdbc_paging_enabled + # Pagination is not supported when using prepared statements + error_messages << "JDBC pagination cannot be used at this time" + end + error_messages + end + def execute_query(queue) - # update default parameters - @parameters['sql_last_value'] = @value_tracker.value - execute_statement(@statement, @parameters) do |row| + execute_statement do |row| if enable_encoding? ## do the necessary conversions to string elements row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }] end event = LogStash::Event.new(row)