lib/rocket_job/dirmon_entry.rb in rocketjob-2.1.3 vs lib/rocket_job/dirmon_entry.rb in rocketjob-3.0.0.alpha
- old
+ new
@@ -4,13 +4,12 @@
module RocketJob
class DirmonEntry
include Plugins::Document
include Plugins::StateMachine
- # @formatter:off
# User defined name used to identify this DirmonEntry in Mission Control
- key :name, String
+ field :name, type: String
# Pattern for finding files
#
# Example: All files ending in '.csv' in the input_files/process1 directory
# input_files/process1/*.csv
@@ -25,67 +24,53 @@
#
# Note
# - If there is no '*' in the pattern then an exact filename match is expected
# - The pattern is not validated to ensure the path exists, it will be validated against the
# `whitelist_paths` when processed by DirmonJob
- key :pattern, String
+ field :pattern, type: String
# Job to enqueue for processing for every file that matches the pattern
#
# Example:
# "ProcessItJob"
- key :job_class_name, String
+ field :job_class_name, type: String
- # Any user supplied arguments for the method invocation
- # All keys must be UTF-8 strings. The values can be any valid BSON type:
- # Integer
- # Float
- # Time (UTC)
- # String (UTF-8)
- # Array
- # Hash
- # True
- # False
- # Symbol
- # nil
- # Regular Expression
- #
- # Note: Date is not supported, convert it to a UTC time
- key :arguments, Array
-
# Any job properties to set
#
# Example, override the default job priority:
# { priority: 45 }
- key :properties, Hash
+ field :properties, type: Hash, default: {}
# Archive directory to move files to when processed to prevent processing the
# file again.
#
# If supplied, the file will be moved to this directory before the job is started
# If the file was in a sub-directory, the corresponding sub-directory will
# be created in the archive directory.
- key :archive_directory, String
+ field :archive_directory, type: String
# If this DirmonEntry is in the failed state, exception contains the cause
- one :exception, class_name: 'RocketJob::JobException'
+ embeds_one :exception, class_name: 'RocketJob::JobException'
# The maximum number of files that should ever match during a single poll of the pattern.
#
# Too many files could be as a result of an invalid pattern specification.
# Exceeding this number will result in an exception being logged in a failed Dirmon instance.
# Dirmon processing will continue with new instances.
# TODO: Implement max_hits
- #key :max_hits, Integer, default: 100
+ #field :max_hits, type: Integer, default: 100
#
# Read-only attributes
#
# Current state, as set by the state machine. Do not modify directly.
- key :state, Symbol, default: :pending
+ field :state, type: Symbol, default: :pending
+ # Unique index on pattern to help prevent two entries from scanning the same files
+ index({pattern: 1}, background: true, unique: true, drop_dups: true)
+
# State Machine events and transitions
#
# :pending -> :enabled -> :disabled
# -> :failed
# -> :failed -> :active
@@ -105,17 +90,17 @@
# DirmonEntry has been manually disabled
state :disabled
event :enable do
- transitions from: :pending, to: :enabled
+ transitions from: :pending, to: :enabled
transitions from: :disabled, to: :enabled
end
event :disable do
transitions from: :enabled, to: :disabled
- transitions from: :failed, to: :disabled
+ transitions from: :failed, to: :disabled
end
event :fail, before: :set_exception do
transitions from: :enabled, to: :failed
end
@@ -132,31 +117,18 @@
false
end
record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists
end
- validates_each :arguments do |record, attr, value|
- if klass = record.job_class
- count = klass.rocket_job_argument_count
- record.errors.add(attr, "There must be #{count} argument(s)") if value.size != count
- end
- end
-
validates_each :properties do |record, attr, value|
if record.job_class && (methods = record.job_class.instance_methods)
value.each_pair do |k, v|
record.errors.add(attr, "Unknown property: #{k.inspect} with value: #{v}") unless methods.include?("#{k}=".to_sym)
end
end
end
- # Create indexes
- def self.create_indexes
- # Unique index on pattern to help prevent two entries from scanning the same files
- ensure_index({pattern: 1}, background: true, unique: true)
- end
-
# Security Settings
#
# A whitelist of paths from which to process files.
# This prevents accidental or malicious `pattern`s from processing files from anywhere
# in the system that the user under which Dirmon is running can access.
@@ -304,11 +276,11 @@
# Queues the job for the supplied pathname
def later(pathname)
if klass = job_class
logger.measure_info "Enqueued: #{name}, Job class: #{job_class_name}" do
- job = klass.new(properties.merge(arguments: arguments))
+ job = klass.new(properties)
upload_file(job, pathname)
job.save!
job
end
else
@@ -339,13 +311,11 @@
full_file_name = archive_file(job, pathname)
if job.respond_to?(:upload_file_name=)
job.upload_file_name = full_file_name
elsif job.respond_to?(:full_file_name=)
job.full_file_name = full_file_name
- elsif job.arguments.first.is_a?(Hash)
- job.arguments.first[:full_file_name] = full_file_name
else
- raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or the first argument must be a Hash")
+ raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or 'full_file_name'")
end
end
# Move the file to the archive directory
#