lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.14 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.16
- old
+ new
@@ -199,10 +199,16 @@
# }
# -------------------------------------------------------
# this will only convert column0 that has ISO-8859-1 as an original encoding.
config :columns_charset, :validate => :hash, :default => {}
+ config :use_prepared_statements, :validate => :boolean, :default => false
+
+ config :prepared_statement_name, :validate => :string, :default => ""
+
+ config :prepared_statement_bind_values, :validate => :array, :default => []
+
attr_reader :database # for test mocking/stubbing
public
def register
@@ -215,21 +221,29 @@
if @tracking_column.nil?
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
end
end
- set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
- set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))
-
- @enable_encoding = !@charset.nil? || !@columns_charset.empty?
-
unless @statement.nil? ^ @statement_filepath.nil?
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
end
@statement = ::File.read(@statement_filepath) if @statement_filepath
+ # must validate prepared statement mode after trying to read in from @statement_filepath
+ if @use_prepared_statements
+ validation_errors = validate_prepared_statement_mode
+ unless validation_errors.empty?
+ raise(LogStash::ConfigurationError, "Prepared Statement Mode validation errors: " + validation_errors.join(", "))
+ end
+ end
+
+ set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self))
+ set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger))
+
+ @enable_encoding = !@charset.nil? || !@columns_charset.empty?
+
if (@jdbc_password_filepath and @jdbc_password)
raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.")
end
@jdbc_password = LogStash::Util::Password.new(::File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath
@@ -246,11 +260,11 @@
end
end # def register
# test injection points
def set_statement_logger(instance)
- @statement_logger = instance
+ @statement_handler = LogStash::PluginMixins::Jdbc::StatementHandler.build_statement_handler(self, instance)
end
def set_value_tracker(instance)
@value_tracker = instance
end
@@ -273,13 +287,27 @@
@scheduler.shutdown(:wait) if @scheduler
end
private
+ def validate_prepared_statement_mode
+ error_messages = []
+ if @prepared_statement_name.empty?
+ error_messages << "must provide a name for the Prepared Statement, it must be unique for the db session"
+ end
+ if @statement.count("?") != @prepared_statement_bind_values.size
+ # mismatch in number of bind value elements to placeholder characters
+ error_messages << "there is a mismatch between the number of statement `?` placeholders and :prepared_statement_bind_values array setting elements"
+ end
+ if @jdbc_paging_enabled
+ # Pagination is not supported when using prepared statements
+ error_messages << "JDBC pagination cannot be used at this time"
+ end
+ error_messages
+ end
+
def execute_query(queue)
- # update default parameters
- @parameters['sql_last_value'] = @value_tracker.value
- execute_statement(@statement, @parameters) do |row|
+ execute_statement do |row|
if enable_encoding?
## do the necessary conversions to string elements
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
end
event = LogStash::Event.new(row)