lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.11 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.12

- old
+ new

@@ -1,9 +1,9 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" -require "logstash/plugin_mixins/jdbc" +require "logstash/plugin_mixins/jdbc/jdbc" # This plugin was created as a way to ingest data from any database # with a JDBC interface into Logstash. You can periodically schedule ingestion # using a cron syntax (see `schedule` setting) or run the query one time to load @@ -121,12 +121,12 @@ # # ... other configuration bits # } # } # --------------------------------------------------------------------------------------------------- # -class LogStash::Inputs::Jdbc < LogStash::Inputs::Base - include LogStash::PluginMixins::Jdbc +module LogStash module Inputs class Jdbc < LogStash::Inputs::Base + include LogStash::PluginMixins::Jdbc::Jdbc config_name "jdbc" # If undefined, Logstash will complain, even if codec is unused. default :codec, "plain" @@ -211,25 +211,26 @@ if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end end - @value_tracker = LogStash::PluginMixins::ValueTracking.build_last_value_tracker(self) + set_value_tracker(LogStash::PluginMixins::Jdbc::ValueTracking.build_last_value_tracker(self)) + set_statement_logger(LogStash::PluginMixins::Jdbc::CheckedCountLogger.new(@logger)) @enable_encoding = !@charset.nil? || !@columns_charset.empty? unless @statement.nil? ^ @statement_filepath.nil? raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end - @statement = File.read(@statement_filepath) if @statement_filepath + @statement = ::File.read(@statement_filepath) if @statement_filepath if (@jdbc_password_filepath and @jdbc_password) raise(LogStash::ConfigurationError, "Only one of :jdbc_password, :jdbc_password_filepath may be set at a time.") end - @jdbc_password = LogStash::Util::Password.new(File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath + @jdbc_password = LogStash::Util::Password.new(::File.read(@jdbc_password_filepath).strip) if @jdbc_password_filepath if enable_encoding? encodings = @columns_charset.values encodings << @charset if @charset @@ -239,10 +240,19 @@ converters[encoding] = converter end end end # def register + # test injection points + def set_statement_logger(instance) + @statement_logger = instance + end + + def set_value_tracker(instance) + @value_tracker = instance + end + def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do execute_query(queue) @@ -294,6 +304,6 @@ converter.convert(value) else value end end -end # class LogStash::Inputs::Jdbc +end end end # class LogStash::Inputs::Jdbc