lib/rocket_job/dirmon_entry.rb in rocketjob-1.0.0 vs lib/rocket_job/dirmon_entry.rb in rocketjob-1.1.0

- old
+ new

@@ -1,30 +1,41 @@ +require 'thread_safe' +require 'pathname' +require 'fileutils' module RocketJob class DirmonEntry include MongoMapper::Document + include AASM - # Name for this path entry used to identify this DirmonEntry - # in the user interface + # @formatter:off + # User defined name used to identify this DirmonEntry in Mission Control key :name, String - # Wildcard path to search for files in + # Pattern for finding files # - # Example: - # input_files/process1/*.csv* + # Example: All files ending in '.csv' in the input_files/process1 directory + # input_files/process1/*.csv + # + # Example: All files in the input_files/process1 directory and all sub-directories # input_files/process2/**/* # - # For details on valid path values, see: http://ruby-doc.org/core-2.2.2/Dir.html#method-c-glob + # Example: All files in the input_files/process2 directory with .csv or .txt extensions + # input_files/process2/*.{csv,txt} # + # For details on valid pattern values, see: http://ruby-doc.org/core-2.2.2/Dir.html#method-c-glob + # # Note - # - If there are no '*' in the path then an exact filename match is expected - key :path, String + # - If there is no '*' in the pattern then an exact filename match is expected + # - The pattern is not validated to ensure the path exists, it will be validated against the + # `whitelist_paths` when processed by DirmonJob + key :pattern, String - # Job to start + # Job to enqueue for processing for every file that matches the pattern # # Example: # "ProcessItJob" - key :job_name, String + key :job_class_name, String # Any user supplied arguments for the method invocation # All keys must be UTF-8 strings. The values can be any valid BSON type: # Integer # Float @@ -50,48 +61,267 @@ # Archive directory to move files to when processed to prevent processing the # file again. # # If supplied, the file will be moved to this directory before the job is started # If the file was in a sub-directory, the corresponding sub-directory will - # be created in the archive directory, if the path being scanned for files - # is a relative path. (I.e. Does not start with '/') . + # be created in the archive directory. key :archive_directory, String - # Allow a monitoring path to be temporarily disabled - key :enabled, Boolean, default: true - # Method to perform on the job, usually :perform key :perform_method, Symbol, default: :perform - # Returns the Job to be queued - def job_class - job_name.nil? ? nil : job_name.constantize + # If this DirmonEntry is in the failed state, exception contains the cause + one :exception, class_name: 'RocketJob::JobException' + + # The maximum number of files that should ever match during a single poll of the pattern. + # + # Too many files could be as a result of an invalid pattern specification. + # Exceeding this number will result in an exception being logged in a failed Dirmon instance. + # Dirmon processing will continue with new instances. + # TODO: Implement max_hits + #key :max_hits, Integer, default: 100 + + # + # Read-only attributes + # + + # Current state, as set by AASM + key :state, Symbol, default: :pending + + # State Machine events and transitions + # + # :pending -> :enabled -> :disabled + # -> :failed + # -> :failed -> :active + # -> :disabled + # -> :disabled -> :active + aasm column: :state do + # DirmonEntry is `pending` until it is approved + state :pending, initial: true + + # DirmonEntry is Enabled and will be included by DirmonJob + state :enabled + + # DirmonEntry failed during processing and requires manual intervention + # See the exception for the reason for failing this entry + # For example: access denied, whitelist_path security violation, etc. + state :failed + + # DirmonEntry has been manually disabled + state :disabled + + event :enable do + transitions from: :pending, to: :enabled + transitions from: :disabled, to: :enabled + end + + event :disable do + transitions from: :enabled, to: :disabled + transitions from: :failed, to: :disabled + end + + event :fail do + transitions from: :enabled, to: :failed + end end - validates_presence_of :path, :job_name + # @formatter:on + validates_presence_of :pattern, :job_class_name, :perform_method - validates_each :job_name do |record, attr, value| - exists = false - begin - exists = value.nil? ? false : value.constantize.ancestors.include?(RocketJob::Job) - rescue NameError => exc + validates_each :perform_method do |record, attr, value| + if (klass = record.job_class) && !klass.instance_method(value) + record.errors.add(attr, "Method not implemented by #{record.job_class_name}") end - record.errors.add(attr, 'job_name must be defined and must be derived from RocketJob::Job') unless exists end + validates_each :job_class_name do |record, attr, value| + exists = + begin + value.nil? ? false : record.job_class.ancestors.include?(RocketJob::Job) + rescue NameError + false + end + record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists + end + validates_each :arguments do |record, attr, value| - if klass = record.job_class + if (klass = record.job_class) count = klass.argument_count(record.perform_method) - record.errors.add(attr, "There must be #{count} argument(s)") if value.size != count + record.errors.add(attr, "There must be #{count} argument(s)") if value.size != count end end validates_each :properties do |record, attr, value| - if record.job_name && (methods = record.job_class.instance_methods) - value.each_pair do |key, value| - record.errors.add(attr, "Unknown property: #{key.inspect} with value: #{value}") unless methods.include?("#{key}=".to_sym) + if record.job_class && (methods = record.job_class.instance_methods) + value.each_pair do |k, v| + record.errors.add(attr, "Unknown property: #{k.inspect} with value: #{v}") unless methods.include?("#{k}=".to_sym) end end + end + + # Create indexes + def self.create_indexes + # Unique index on pattern to help prevent two entries from scanning the same files + ensure_index({pattern: 1}, background: true, unique: true) + end + + # Security Settings + # + # A whitelist of paths from which to process files. + # This prevents accidental or malicious `pattern`s from processing files from anywhere + # in the system that the user under which Dirmon is running can access. + # + # All resolved `pattern`s must start with one of the whitelisted path, otherwise they will be rejected + # + # Note: + # - If no whitelist paths have been added, then a whitelist check is _not_ performed + # - Relative paths can be used, but are not considered safe since they can be manipulated + # - These paths should be assigned in an initializer and not editable via the Web UI to ensure + # that they are not tampered with + # + # Default: [] ==> Do not enforce whitelists + # + # Returns [Array<String>] a copy of the whitelisted paths + def self.whitelist_paths + @@whitelist_paths.dup + end + + # Add a path to the whitelist + # Raises: Errno::ENOENT: No such file or directory + def self.add_whitelist_path(path) + # Confirms that path exists + path = Pathname.new(path).realpath.to_s + @@whitelist_paths << path + @@whitelist_paths.uniq! + path + end + + # Deletes a path from the whitelist paths + # Raises: Errno::ENOENT: No such file or directory + def self.delete_whitelist_path(path) + # Confirms that path exists + path = Pathname.new(path).realpath.to_s + @@whitelist_paths.delete(path) + @@whitelist_paths.uniq! + path + end + + # The default archive directory that is used when the job being queued does not respond + # to #file_store_upload or #upload, and do not have an `archive_directory` specified in this entry + cattr_accessor :default_archive_directory + + @@default_archive_directory = '_archive'.freeze + + # Returns [Pathname] the archive_directory if set, otherwise the default_archive_directory + def archive_pathname + Pathname.new(archive_directory || self.class.default_archive_directory) + end + + # Passes each filename [Pathname] found that matches the pattern into the supplied block + def each(&block) + logger.tagged("DirmonEntry:#{id}") do + Pathname.glob(pattern).each do |pathname| + next if pathname.directory? + pathname = pathname.realpath + file_name = pathname.to_s + + # Skip archive directories + next if file_name.start_with?(archive_pathname.realpath.to_s) + + # Security check? + if (@@whitelist_paths.size > 0) && @@whitelist_paths.none? { |whitepath| file_name.start_with?(whitepath) } + logger.warn "Ignoring file: #{file_name} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}" + next + end + + # File must be writable so it can be removed after processing + unless pathname.writable? + logger.warn "Ignoring file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job" + next + end + block.call(pathname) + end + end + end + + # Set exception information for this DirmonEntry and fail it + def fail_with_exception!(worker_name, exc_or_message) + if exc_or_message.is_a?(Exception) + self.exception = JobException.from_exception(exc_or_message) + exception.worker_name = worker_name + else + build_exception( + class_name: 'RocketJob::DirmonEntryException', + message: exc_or_message, + backtrace: [], + worker_name: worker_name + ) + end + fail! + end + + @@whitelist_paths = ThreadSafe::Array.new + + # Returns the Job to be queued + def job_class + return if job_class_name.nil? + job_class_name.constantize + rescue NameError + nil + end + + # Queues the job for the supplied pathname + def later(pathname) + job_class.perform_later(*arguments) do |job| + job.perform_method = perform_method + # Set properties + properties.each_pair { |k, v| job.send("#{k}=".to_sym, v) } + + upload_file(job, pathname) + end + end + + protected + + # Upload the file to the job + def upload_file(job, pathname) + if job.respond_to?(:file_store_upload) + # Allow the job to determine what to do with the file + # Pass the pathname as a string, not a Pathname (IO) instance + # so that it can read the file directly + job.file_store_upload(pathname.to_s) + archive_directory ? archive_file(job, pathname) : pathname.unlink + elsif job.respond_to?(:upload) + # With RocketJob Pro the file can be uploaded directly into the Job itself + job.upload(pathname.to_s) + archive_directory ? archive_file(job, pathname) : pathname.unlink + else + upload_default(job, pathname) + end + end + + # Archives the file for a job where there was no #file_store_upload or #upload method + def upload_default(job, pathname) + # The first argument must be a hash + job.arguments << {} if job.arguments.size == 0 + job.arguments.first[:full_file_name] = archive_file(job, pathname) + end + + # Move the file to the archive directory + # + # The archived file name is prefixed with the job id + # + # Returns [String] the fully qualified archived file name + # + # Note: + # - Works across partitions when the file and the archive are on different partitions + def archive_file(job, pathname) + target_path = archive_pathname + target_path.mkpath + target_file_name = target_path.join("#{job.id}_#{pathname.basename}") + # In case the file is being moved across partitions + FileUtils.move(pathname.to_s, target_file_name.to_s) + target_file_name.to_s end end end