lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.4 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-4.3.5
- old
+ new
@@ -1,11 +1,11 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc"
-require "yaml" # persistence
+
# This plugin was created as a way to ingest data from any database
# with a JDBC interface into Logstash. You can periodically schedule ingestion
# using a cron syntax (see `schedule` setting) or run the query one time to load
# data into Logstash. Each row in the resultset becomes a single event.
# Columns in the resultset are converted into fields in the event.
@@ -204,26 +204,21 @@
def register
@logger = self.logger
require "rufus/scheduler"
prepare_jdbc_connection
- # Raise an error if @use_column_value is true, but no @tracking_column is set
if @use_column_value
+ # Raise an error if @use_column_value is true, but no @tracking_column is set
if @tracking_column.nil?
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
end
end
+ @value_tracker = LogStash::PluginMixins::ValueTracking.build_last_value_tracker(self)
+
@enable_encoding = !@charset.nil? || !@columns_charset.empty?
- # load sql_last_value from file if exists
- if @clean_run && File.exist?(@last_run_metadata_path)
- File.delete(@last_run_metadata_path)
- elsif File.exist?(@last_run_metadata_path)
- @sql_last_value = YAML.load(File.read(@last_run_metadata_path))
- end
-
unless @statement.nil? ^ @statement_filepath.nil?
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
end
@statement = File.read(@statement_filepath) if @statement_filepath
@@ -246,17 +241,15 @@
def run(queue)
if @schedule
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
@scheduler.cron @schedule do
execute_query(queue)
- update_state_file
end
@scheduler.join
else
execute_query(queue)
- update_state_file
end
end # def run
def stop
close_jdbc_connection
@@ -265,25 +258,20 @@
private
def execute_query(queue)
# update default parameters
- @parameters['sql_last_value'] = @sql_last_value
+ @parameters['sql_last_value'] = @value_tracker.value
execute_statement(@statement, @parameters) do |row|
if enable_encoding?
## do the necessary conversions to string elements
row = Hash[row.map { |k, v| [k.to_s, convert(k, v)] }]
end
event = LogStash::Event.new(row)
decorate(event)
queue << event
end
- end
-
- def update_state_file
- if @record_last_run
- File.write(@last_run_metadata_path, YAML.dump(@sql_last_value))
- end
+ @value_tracker.write
end
private
def enable_encoding?