lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-2.1.1 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-3.0.0
- old
+ new
@@ -33,16 +33,16 @@
#
# Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here].
#
# ==== State
#
-# The plugin will persist the `sql_last_start` parameter in the form of a
-# metadata file stored in the configured `last_run_metadata_path`. Upon shutting down,
-# this file will be updated with the current value of `sql_last_start`. Next time
+# The plugin will persist the `sql_last_value` parameter in the form of a
+# metadata file stored in the configured `last_run_metadata_path`. Upon query execution,
+# this file will be updated with the current value of `sql_last_value`. Next time
# the pipeline starts up, this value will be updated by reading from the file. If
-# `clean_run` is set to true, this value will be ignored and `sql_last_start` will be
-# set to Jan 1, 1970, as if no query has ever been executed.
+# `clean_run` is set to true, this value will be ignored and `sql_last_value` will be
+# set to Jan 1, 1970, or 0 if `use_column_value` is true, as if no query has ever been executed.
#
# ==== Dealing With Large Result-sets
#
# Many JDBC drivers use the `fetch_size` parameter to limit how many
# results are pre-fetched at a time from the cursor into the client's cache
@@ -88,12 +88,13 @@
#
# Some parameters are built-in and can be used from within your queries.
# Here is the list:
#
# |==========================================================
-# |sql_last_start | The last time a statement was executed. This is set to Thursday, 1 January 1970
-# before any query is run, and updated accordingly after first query is run.
+# |sql_last_value | The value used to calculate which rows to query. Before any query is run,
+# this is set to Thursday, 1 January 1970, or 0 if `use_column_value` is true and
+# `tracking_column` is set. It is updated accordingly after subsequent queries are run.
# |==========================================================
#
class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
include LogStash::PluginMixins::Jdbc
config_name "jdbc"
@@ -129,10 +130,16 @@
config :schedule, :validate => :string
# Path to file with last run time
config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run"
+ # Use an incremental column value rather than a timestamp
+ config :use_column_value, :validate => :boolean, :default => false
+
+ # If tracking column value rather than timestamp, the column whose value is to be tracked
+ config :tracking_column, :validate => :string
+
# Whether the previous run state should be preserved
config :clean_run, :validate => :boolean, :default => false
# Whether to save state or not in last_run_metadata_path
config :record_last_run, :validate => :boolean, :default => true
@@ -144,15 +151,22 @@
def register
require "rufus/scheduler"
prepare_jdbc_connection
- # load sql_last_start from file if exists
+ # Raise an error if @use_column_value is true, but no @tracking_column is set
+ if @use_column_value
+ if @tracking_column.nil?
+ raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
+ end
+ end
+
+ # load sql_last_value from file if exists
if @clean_run && File.exist?(@last_run_metadata_path)
File.delete(@last_run_metadata_path)
elsif File.exist?(@last_run_metadata_path)
- @sql_last_start = YAML.load(File.read(@last_run_metadata_path))
+ @sql_last_value = YAML.load(File.read(@last_run_metadata_path))
end
unless @statement.nil? ^ @statement_filepath.nil?
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
end
@@ -163,36 +177,40 @@
def run(queue)
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@scheduler.cron @schedule do
execute_query(queue)
+ update_state_file
end
@scheduler.join
else
execute_query(queue)
+ update_state_file
end
end # def run
def stop
@scheduler.stop if @scheduler
- # update state file for next run
- if @record_last_run
- File.write(@last_run_metadata_path, YAML.dump(@sql_last_start))
- end
-
close_jdbc_connection
end
private
def execute_query(queue)
# update default parameters
- @parameters['sql_last_start'] = @sql_last_start
+ @parameters['sql_last_value'] = @sql_last_value
execute_statement(@statement, @parameters) do |row|
event = LogStash::Event.new(row)
decorate(event)
queue << event
end
end
+
+ def update_state_file
+ if @record_last_run
+ File.write(@last_run_metadata_path, YAML.dump(@sql_last_value))
+ end
+ end
+
end # class LogStash::Inputs::Jdbc