lib/logstash/inputs/jdbc.rb in logstash-integration-jdbc-5.2.5 vs lib/logstash/inputs/jdbc.rb in logstash-integration-jdbc-5.2.6

- old
+ new

@@ -7,10 +7,11 @@ require "logstash/plugin_mixins/ecs_compatibility_support" require "logstash/plugin_mixins/ecs_compatibility_support/target_check" require "logstash/plugin_mixins/validator_support/field_reference_validation_adapter" require "logstash/plugin_mixins/event_support/event_factory_adapter" +require "fileutils" # this require_relative returns early unless the JRuby version is between 9.2.0.0 and 9.2.8.0 require_relative "tzinfo_jruby_patch" # This plugin was created as a way to ingest data from any database @@ -176,12 +177,14 @@ # # There is no schedule by default. If no schedule is given, then the statement is run # exactly once. config :schedule, :validate => :string - # Path to file with last run time - config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" + # Path to file with last run time. + # The default will write file to `<path.data>/plugins/inputs/jdbc/logstash_jdbc_last_run` + # NOTE: it must be a file path and not a directory path + config :last_run_metadata_path, :validate => :string # Use an incremental column value rather than a timestamp config :use_column_value, :validate => :boolean, :default => false # If tracking column value rather than timestamp, the column whose value is to be tracked @@ -228,15 +231,37 @@ # Define the target field to store the loaded columns config :target, :validate => :field_reference, :required => false attr_reader :database # for test mocking/stubbing + attr_reader :last_run_metadata_file_path # path to the file used as last run storage public def register @logger = self.logger + + if @record_last_run + if @last_run_metadata_path.nil? + logstash_data_path = LogStash::SETTINGS.get_value("path.data") + logstash_data_path = Pathname.new(logstash_data_path).join("plugins", "inputs", "jdbc") + # Ensure that the filepath exists before writing, since it's deeply nested. + logstash_data_path.mkpath + logstash_data_file_path = logstash_data_path.join("logstash_jdbc_last_run") + + ensure_default_metadatafile_location(logstash_data_file_path) + + @last_run_metadata_file_path = logstash_data_file_path.to_path + else + # validate the path is a file and not a directory + if Pathname.new(@last_run_metadata_path).directory? + raise LogStash::ConfigurationError.new("The \"last_run_metadata_path\" argument must point to a file, received a directory: \"#{last_run_metadata_path}\"") + end + @last_run_metadata_file_path = @last_run_metadata_path + end + end + require "rufus/scheduler" prepare_jdbc_connection if @use_column_value # Raise an error if @use_column_value is true, but no @tracking_column is set @@ -361,6 +386,25 @@ converter.convert(value) else value end end + + def ensure_default_metadatafile_location(metadata_new_path) + old_default_path = Pathname.new("#{ENV['HOME']}/.logstash_jdbc_last_run") + + if old_default_path.exist? && !metadata_new_path.exist? + # Previous versions of the plugin hosted the state file into $HOME/.logstash_jdbc_last_run. + # Copy in the new location + FileUtils.cp(old_default_path.to_path, metadata_new_path.to_path) + begin + # If there is a permission error in the delete of the old file inform the user to give + # the correct access rights + ::File.delete(old_default_path.to_path) + @logger.info("Successfully moved the #{old_default_path.to_path} into #{metadata_new_path.to_path}") + rescue e + @logger.warn("Using new metadata file at #{metadata_new_path.to_path} but #{old_default_path} can't be removed.") + end + end + end + end end end # class LogStash::Inputs::Jdbc