lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-2.1.1 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-3.0.0

- old
+ new

@@ -33,16 +33,16 @@ # # Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here]. # # ==== State # -# The plugin will persist the `sql_last_start` parameter in the form of a -# metadata file stored in the configured `last_run_metadata_path`. Upon shutting down, -# this file will be updated with the current value of `sql_last_start`. Next time +# The plugin will persist the `sql_last_value` parameter in the form of a +# metadata file stored in the configured `last_run_metadata_path`. Upon query execution, +# this file will be updated with the current value of `sql_last_value`. Next time # the pipeline starts up, this value will be updated by reading from the file. If -# `clean_run` is set to true, this value will be ignored and `sql_last_start` will be -# set to Jan 1, 1970, as if no query has ever been executed. +# `clean_run` is set to true, this value will be ignored and `sql_last_value` will be +# set to Jan 1, 1970, or 0 if `use_column_value` is true, as if no query has ever been executed. # # ==== Dealing With Large Result-sets # # Many JDBC drivers use the `fetch_size` parameter to limit how many # results are pre-fetched at a time from the cursor into the client's cache @@ -88,12 +88,13 @@ # # Some parameters are built-in and can be used from within your queries. # Here is the list: # # |========================================================== -# |sql_last_start | The last time a statement was executed. This is set to Thursday, 1 January 1970 -# before any query is run, and updated accordingly after first query is run. +# |sql_last_value | The value used to calculate which rows to query. Before any query is run, +# this is set to Thursday, 1 January 1970, or 0 if `use_column_value` is true and +# `tracking_column` is set. It is updated accordingly after subsequent queries are run. # |========================================================== # class LogStash::Inputs::Jdbc < LogStash::Inputs::Base include LogStash::PluginMixins::Jdbc config_name "jdbc" @@ -129,10 +130,16 @@ config :schedule, :validate => :string # Path to file with last run time config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" + # Use an incremental column value rather than a timestamp + config :use_column_value, :validate => :boolean, :default => false + + # If tracking column value rather than timestamp, the column whose value is to be tracked + config :tracking_column, :validate => :string + # Whether the previous run state should be preserved config :clean_run, :validate => :boolean, :default => false # Whether to save state or not in last_run_metadata_path config :record_last_run, :validate => :boolean, :default => true @@ -144,15 +151,22 @@ def register require "rufus/scheduler" prepare_jdbc_connection - # load sql_last_start from file if exists + # Raise an error if @use_column_value is true, but no @tracking_column is set + if @use_column_value + if @tracking_column.nil? + raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") + end + end + + # load sql_last_value from file if exists if @clean_run && File.exist?(@last_run_metadata_path) File.delete(@last_run_metadata_path) elsif File.exist?(@last_run_metadata_path) - @sql_last_start = YAML.load(File.read(@last_run_metadata_path)) + @sql_last_value = YAML.load(File.read(@last_run_metadata_path)) end unless @statement.nil? ^ @statement_filepath.nil? raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") end @@ -163,36 +177,40 @@ def run(queue) if @schedule @scheduler = Rufus::Scheduler.new(:max_work_threads => 1) @scheduler.cron @schedule do execute_query(queue) + update_state_file end @scheduler.join else execute_query(queue) + update_state_file end end # def run def stop @scheduler.stop if @scheduler - # update state file for next run - if @record_last_run - File.write(@last_run_metadata_path, YAML.dump(@sql_last_start)) - end - close_jdbc_connection end private def execute_query(queue) # update default parameters - @parameters['sql_last_start'] = @sql_last_start + @parameters['sql_last_value'] = @sql_last_value execute_statement(@statement, @parameters) do |row| event = LogStash::Event.new(row) decorate(event) queue << event end end + + def update_state_file + if @record_last_run + File.write(@last_run_metadata_path, YAML.dump(@sql_last_value)) + end + end + end # class LogStash::Inputs::Jdbc