lib/logstash/inputs/jdbc.rb in logstash-integration-jdbc-5.2.5 vs lib/logstash/inputs/jdbc.rb in logstash-integration-jdbc-5.2.6
- old
+ new
@@ -7,10 +7,11 @@
require "logstash/plugin_mixins/ecs_compatibility_support"
require "logstash/plugin_mixins/ecs_compatibility_support/target_check"
require "logstash/plugin_mixins/validator_support/field_reference_validation_adapter"
require "logstash/plugin_mixins/event_support/event_factory_adapter"
+require "fileutils"
# this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0
require_relative "tzinfo_jruby_patch"
# This plugin was created as a way to ingest data from any database
@@ -176,12 +177,14 @@
#
# There is no schedule by default. If no schedule is given, then the statement is run
# exactly once.
config :schedule, :validate => :string
- # Path to file with last run time
- config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run"
+ # Path to file with last run time.
+ # The default will write file to `<path.data>/plugins/inputs/jdbc/logstash_jdbc_last_run`
+ # NOTE: it must be a file path and not a directory path
+ config :last_run_metadata_path, :validate => :string
# Use an incremental column value rather than a timestamp
config :use_column_value, :validate => :boolean, :default => false
# If tracking column value rather than timestamp, the column whose value is to be tracked
@@ -228,15 +231,37 @@
# Define the target field to store the loaded columns
config :target, :validate => :field_reference, :required => false
attr_reader :database # for test mocking/stubbing
+ attr_reader :last_run_metadata_file_path # path to the file used as last run storage
public
def register
@logger = self.logger
+
+ if @record_last_run
+ if @last_run_metadata_path.nil?
+ logstash_data_path = LogStash::SETTINGS.get_value("path.data")
+ logstash_data_path = Pathname.new(logstash_data_path).join("plugins", "inputs", "jdbc")
+ # Ensure that the filepath exists before writing, since it's deeply nested.
+ logstash_data_path.mkpath
+ logstash_data_file_path = logstash_data_path.join("logstash_jdbc_last_run")
+
+ ensure_default_metadatafile_location(logstash_data_file_path)
+
+ @last_run_metadata_file_path = logstash_data_file_path.to_path
+ else
+ # validate the path is a file and not a directory
+ if Pathname.new(@last_run_metadata_path).directory?
+ raise LogStash::ConfigurationError.new("The \"last_run_metadata_path\" argument must point to a file, received a directory: \"#{last_run_metadata_path}\"")
+ end
+ @last_run_metadata_file_path = @last_run_metadata_path
+ end
+ end
+
require "rufus/scheduler"
prepare_jdbc_connection
if @use_column_value
# Raise an error if @use_column_value is true, but no @tracking_column is set
@@ -361,6 +386,25 @@
converter.convert(value)
else
value
end
end
+
+ def ensure_default_metadatafile_location(metadata_new_path)
+ old_default_path = Pathname.new("#{ENV['HOME']}/.logstash_jdbc_last_run")
+
+ if old_default_path.exist? && !metadata_new_path.exist?
+ # Previous versions of the plugin hosted the state file into $HOME/.logstash_jdbc_last_run.
+ # Copy in the new location
+ FileUtils.cp(old_default_path.to_path, metadata_new_path.to_path)
+ begin
+ # If there is a permission error in the delete of the old file inform the user to give
+ # the correct access rights
+ ::File.delete(old_default_path.to_path)
+ @logger.info("Successfully moved the #{old_default_path.to_path} into #{metadata_new_path.to_path}")
+ rescue e
+ @logger.warn("Using new metadata file at #{metadata_new_path.to_path} but #{old_default_path} can't be removed.")
+ end
+ end
+ end
+
end end end # class LogStash::Inputs::Jdbc