lib/rocket_job/dirmon_entry.rb in rocketjob-2.1.3 vs lib/rocket_job/dirmon_entry.rb in rocketjob-3.0.0.alpha

- old
+ new

@@ -4,13 +4,12 @@ module RocketJob class DirmonEntry include Plugins::Document include Plugins::StateMachine - # @formatter:off # User defined name used to identify this DirmonEntry in Mission Control - key :name, String + field :name, type: String # Pattern for finding files # # Example: All files ending in '.csv' in the input_files/process1 directory # input_files/process1/*.csv @@ -25,67 +24,53 @@ # # Note # - If there is no '*' in the pattern then an exact filename match is expected # - The pattern is not validated to ensure the path exists, it will be validated against the # `whitelist_paths` when processed by DirmonJob - key :pattern, String + field :pattern, type: String # Job to enqueue for processing for every file that matches the pattern # # Example: # "ProcessItJob" - key :job_class_name, String + field :job_class_name, type: String - # Any user supplied arguments for the method invocation - # All keys must be UTF-8 strings. The values can be any valid BSON type: - # Integer - # Float - # Time (UTC) - # String (UTF-8) - # Array - # Hash - # True - # False - # Symbol - # nil - # Regular Expression - # - # Note: Date is not supported, convert it to a UTC time - key :arguments, Array - # Any job properties to set # # Example, override the default job priority: # { priority: 45 } - key :properties, Hash + field :properties, type: Hash, default: {} # Archive directory to move files to when processed to prevent processing the # file again. # # If supplied, the file will be moved to this directory before the job is started # If the file was in a sub-directory, the corresponding sub-directory will # be created in the archive directory. - key :archive_directory, String + field :archive_directory, type: String # If this DirmonEntry is in the failed state, exception contains the cause - one :exception, class_name: 'RocketJob::JobException' + embeds_one :exception, class_name: 'RocketJob::JobException' # The maximum number of files that should ever match during a single poll of the pattern. # # Too many files could be as a result of an invalid pattern specification. # Exceeding this number will result in an exception being logged in a failed Dirmon instance. # Dirmon processing will continue with new instances. # TODO: Implement max_hits - #key :max_hits, Integer, default: 100 + #field :max_hits, type: Integer, default: 100 # # Read-only attributes # # Current state, as set by the state machine. Do not modify directly. - key :state, Symbol, default: :pending + field :state, type: Symbol, default: :pending + # Unique index on pattern to help prevent two entries from scanning the same files + index({pattern: 1}, background: true, unique: true, drop_dups: true) + # State Machine events and transitions # # :pending -> :enabled -> :disabled # -> :failed # -> :failed -> :active @@ -105,17 +90,17 @@ # DirmonEntry has been manually disabled state :disabled event :enable do - transitions from: :pending, to: :enabled + transitions from: :pending, to: :enabled transitions from: :disabled, to: :enabled end event :disable do transitions from: :enabled, to: :disabled - transitions from: :failed, to: :disabled + transitions from: :failed, to: :disabled end event :fail, before: :set_exception do transitions from: :enabled, to: :failed end @@ -132,31 +117,18 @@ false end record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists end - validates_each :arguments do |record, attr, value| - if klass = record.job_class - count = klass.rocket_job_argument_count - record.errors.add(attr, "There must be #{count} argument(s)") if value.size != count - end - end - validates_each :properties do |record, attr, value| if record.job_class && (methods = record.job_class.instance_methods) value.each_pair do |k, v| record.errors.add(attr, "Unknown property: #{k.inspect} with value: #{v}") unless methods.include?("#{k}=".to_sym) end end end - # Create indexes - def self.create_indexes - # Unique index on pattern to help prevent two entries from scanning the same files - ensure_index({pattern: 1}, background: true, unique: true) - end - # Security Settings # # A whitelist of paths from which to process files. # This prevents accidental or malicious `pattern`s from processing files from anywhere # in the system that the user under which Dirmon is running can access. @@ -304,11 +276,11 @@ # Queues the job for the supplied pathname def later(pathname) if klass = job_class logger.measure_info "Enqueued: #{name}, Job class: #{job_class_name}" do - job = klass.new(properties.merge(arguments: arguments)) + job = klass.new(properties) upload_file(job, pathname) job.save! job end else @@ -339,13 +311,11 @@ full_file_name = archive_file(job, pathname) if job.respond_to?(:upload_file_name=) job.upload_file_name = full_file_name elsif job.respond_to?(:full_file_name=) job.full_file_name = full_file_name - elsif job.arguments.first.is_a?(Hash) - job.arguments.first[:full_file_name] = full_file_name else - raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or the first argument must be a Hash") + raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or 'full_file_name'") end end # Move the file to the archive directory #