lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.4 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.5

- old
+ new

@@ -1,11 +1,11 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "logstash/plugin_mixins/jdbc" -require "yaml" # persistence + # This plugin was created as a way to ingest data from any database # with a JDBC interface into Logstash. You can periodically schedule ingestion # using a cron syntax (see `schedule` setting) or run the query one time to load # data into Logstash. Each row in the resultset becomes a single event. # Columns in the resultset are converted into fields in the event. @@ -204,26 +204,21 @@ def register @logger = self.logger require "rufus/scheduler" prepare_jdbc_connection - # Raise an error if @use_column_value is true, but no @tracking_column is set if @use_column_value + # Raise an error if @use_column_value is true, but no @tracking_column is set if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end end + @value_tracker = LogStash::PluginMixins::ValueTracking.build_last_value_tracker(self) + @enable_encoding = !@charset.nil? || !@columns_charset.empty? - # load sql_last_value from file if exists - if @clean_run && File.exist?(@last_run_metadata_path) - File.delete(@last_run_metadata_path) - elsif File.exist?(@last_run_metadata_path) - @sql_last_value = YAML.load(File.read(@last_run_metadata_path)) - end - unless @statement.nil? ^ @statement_filepath.nil? raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end @statement = File.read(@statement_filepath) if @statement_filepath @@ -246,17 +241,15 @@ def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do execute_query(queue) - update_state_file end @scheduler.join else execute_query(queue) - update_state_file end end # def run def stop close_jdbc_connection @@ -265,25 +258,20 @@ private def execute_query(queue) # update default parameters - @parameters['sql_last_value'] = @sql_last_value + @parameters['sql_last_value'] = @value_tracker.value execute_statement(@statement, @parameters) do |row| if enable_encoding? ## do the necessary conversions to string elements row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }] end event = LogStash::Event.new(row) decorate(event) queue << event end - end - - def update_state_file - if @record_last_run - File.write(@last_run_metadata_path, YAML.dump(@sql_last_value)) - end + @value_tracker.write end private def enable_encoding?