lib/rocket_job/dirmon_entry.rb in rocketjob-3.4.3 vs lib/rocket_job/dirmon_entry.rb in rocketjob-3.5.0
- old
+ new
@@ -4,13 +4,18 @@
module RocketJob
class DirmonEntry
include Plugins::Document
include Plugins::StateMachine
+ # The default archive directory that is used when the job being queued does not respond
+ # to #upload, and does not have an `archive_directory` specified in this entry
+ class_attribute :default_archive_directory
+ self.default_archive_directory = 'archive'.freeze
+
store_in collection: 'rocket_job.dirmon_entries'
- # User defined name used to identify this DirmonEntry in Mission Control
+ # User defined name used to identify this DirmonEntry in the Web Interface.
field :name, type: String
# Pattern for finding files
#
# Example: All files ending in '.csv' in the input_files/process1 directory
@@ -46,22 +51,22 @@
# file again.
#
# If supplied, the file will be moved to this directory before the job is started
# If the file was in a sub-directory, the corresponding sub-directory will
# be created in the archive directory.
- field :archive_directory, type: String
+ field :archive_directory, type: String, default: default_archive_directory
# If this DirmonEntry is in the failed state, exception contains the cause
embeds_one :exception, class_name: 'RocketJob::JobException'
# The maximum number of files that should ever match during a single poll of the pattern.
#
# Too many files could be as a result of an invalid pattern specification.
# Exceeding this number will result in an exception being logged in a failed Dirmon instance.
# Dirmon processing will continue with new instances.
# TODO: Implement max_hits
- #field :max_hits, type: Integer, default: 100
+ # field :max_hits, type: Integer, default: 100
#
# Read-only attributes
#
@@ -69,10 +74,15 @@
field :state, type: Symbol, default: :pending
# Unique index on pattern to help prevent two entries from scanning the same files
index({pattern: 1}, background: true, unique: true, drop_dups: true)
+ before_validation :strip_whitespace
+ validates_presence_of :pattern, :job_class_name, :archive_directory
+ validate :job_is_a_rocket_job
+ validate :job_has_properties
+
# State Machine events and transitions
#
# :pending -> :enabled -> :disabled
# -> :failed
# -> :failed -> :active
@@ -106,31 +116,10 @@
event :fail, before: :set_exception do
transitions from: :enabled, to: :failed
end
end
- # @formatter:on
- validates_presence_of :pattern, :job_class_name
-
- validates_each :job_class_name do |record, attr, value|
- exists =
- begin
- value.nil? ? false : record.job_class.ancestors.include?(RocketJob::Job)
- rescue NameError
- false
- end
- record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists
- end
-
- validates_each :properties do |record, attr, value|
- if record.job_class && (methods = record.job_class.instance_methods)
- value.each_pair do |k, v|
- record.errors.add(attr, "Unknown property: #{k.inspect} with value: #{v}") unless methods.include?("#{k}=".to_sym)
- end
- end
- end
-
# Security Settings
#
# A whitelist of paths from which to process files.
# This prevents accidental or malicious `pattern`s from processing files from anywhere
# in the system that the user under which Dirmon is running can access.
@@ -145,30 +134,30 @@
#
# Default: [] ==> Do not enforce whitelists
#
# Returns [Array<String>] a copy of the whitelisted paths
def self.get_whitelist_paths
- self.whitelist_paths.dup
+ whitelist_paths.dup
end
# Add a path to the whitelist
# Raises: Errno::ENOENT: No such file or directory
def self.add_whitelist_path(path)
# Confirms that path exists
path = Pathname.new(path).realpath.to_s
- self.whitelist_paths << path
- self.whitelist_paths.uniq!
+ whitelist_paths << path
+ whitelist_paths.uniq!
path
end
# Deletes a path from the whitelist paths
# Raises: Errno::ENOENT: No such file or directory
def self.delete_whitelist_path(path)
# Confirms that path exists
path = Pathname.new(path).realpath.to_s
- self.whitelist_paths.delete(path)
- self.whitelist_paths.uniq!
+ whitelist_paths.delete(path)
+ whitelist_paths.uniq!
path
end
# Returns [Hash<String:Integer>] of the number of dirmon entries in each state.
# Note: If there are no workers in that particular state then the hash will not have a value for it.
@@ -191,34 +180,12 @@
counts[result['_id'].to_sym] = result['count']
end
counts
end
- # The default archive directory that is used when the job being queued does not respond
- # to #upload, and does not have an `archive_directory` specified in this entry
- class_attribute :default_archive_directory
-
- self.default_archive_directory = '_archive'.freeze
-
- # Returns [Pathname] the archive_directory if set, otherwise the default_archive_directory
- # Creates the archive directory if one is set
- def archive_pathname(file_pathname)
- if archive_directory
- path = Pathname.new(archive_directory)
- begin
- path.mkpath unless path.exist?
- rescue Errno::ENOENT => exc
- raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}")
- end
- path.realpath
- else
- file_pathname.dirname.join(self.class.default_archive_directory).realdirpath
- end
- end
-
# Passes each filename [Pathname] found that matches the pattern into the supplied block
- def each(&block)
+ def each
SemanticLogger.named_tagged(dirmon_entry: id.to_s) do
# Case insensitive filename matching
Pathname.glob(pattern, File::FNM_CASEFOLD).each do |pathname|
next if pathname.directory?
pathname = begin
@@ -232,21 +199,21 @@
# Skip archive directories
next if file_name.include?(self.class.default_archive_directory)
# Security check?
- if (whitelist_paths.size > 0) && whitelist_paths.none? { |whitepath| file_name.to_s.start_with?(whitepath) }
+ if whitelist_paths.size.positive? && whitelist_paths.none? { |whitepath| file_name.to_s.start_with?(whitepath) }
logger.error "Skipping file: #{file_name} since it is not in any of the whitelisted paths: #{whitelist_paths.join(', ')}"
next
end
# File must be writable so it can be removed after processing
unless pathname.writable?
logger.error "Skipping file: #{file_name} since it is not writable by the current user. Must be able to delete/move the file after queueing the job"
next
end
- block.call(pathname)
+ yield(pathname)
end
end
end
# Set exception information for this DirmonEntry and fail it
@@ -262,74 +229,104 @@
worker_name: worker_name
)
end
end
- # Returns the Job to be queued
+ # Returns the Job to be created.
def job_class
return if job_class_name.nil?
job_class_name.constantize
rescue NameError
nil
end
- # Queues the job for the supplied pathname
+ # Archives the file and kicks off a proxy job to upload the file.
def later(pathname)
- if klass = job_class
- logger.measure_info "Enqueued: #{name}, Job class: #{job_class_name}" do
- job = klass.new(properties)
- upload_file(job, pathname)
- job.save!
- job
- end
- else
- raise(ArgumentError, "Cannot instantiate a class for: #{job_class_name.inspect}")
- end
+ job_id = BSON::ObjectId.new
+ archived_file_name = archive_file(job_id, pathname)
+
+ job = RocketJob::Jobs::UploadFileJob.create!(
+ job_class_name: job_class_name,
+ properties: properties,
+ description: "#{name}: #{pathname.basename}",
+ upload_file_name: archived_file_name.to_s,
+ original_file_name: pathname.to_s,
+ job_id: job_id
+ )
+
+ logger.info(
+ message: 'Created RocketJob::Jobs::UploadFileJob',
+ payload: {
+ dirmon_entry_name: name,
+ upload_file_name: archived_file_name.to_s,
+ original_file_name: pathname.to_s,
+ job_class_name: job_class_name,
+ job_id: job_id.to_s,
+ upload_job_id: job.id.to_s
+ }
+ )
+ job
end
private
+ # strip whitespaces from all variables that reference paths or patterns
+ def strip_whitespace
+ self.pattern = pattern.strip unless pattern.nil?
+ self.archive_directory = archive_directory.strip unless archive_directory.nil?
+ end
+
class_attribute :whitelist_paths
self.whitelist_paths = Concurrent::Array.new
- # Upload the file to the job
- def upload_file(job, pathname)
- if job.respond_to?(:upload)
- # With RocketJob Pro the file can be uploaded directly into the Job itself
- job.upload(pathname.to_s)
- archive_directory ? archive_file(job, pathname) : pathname.unlink
- else
- upload_default(job, pathname)
- end
- end
-
- # Archives the file for a job where there was no #upload method
- def upload_default(job, pathname)
- full_file_name = archive_file(job, pathname)
- if job.respond_to?(:upload_file_name=)
- job.upload_file_name = full_file_name
- elsif job.respond_to?(:full_file_name=)
- job.full_file_name = full_file_name
- else
- raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or 'full_file_name'")
- end
- end
-
# Move the file to the archive directory
#
# The archived file name is prefixed with the job id
#
# Returns [String] the fully qualified archived file name
#
# Note:
# - Works across partitions when the file and the archive are on different partitions
- def archive_file(job, pathname)
+ def archive_file(job_id, pathname)
target_path = archive_pathname(pathname)
target_path.mkpath
- target_file_name = target_path.join("#{job.id}_#{pathname.basename}")
+ target_file_name = target_path.join("#{job_id}_#{pathname.basename}")
# In case the file is being moved across partitions
FileUtils.move(pathname.to_s, target_file_name.to_s)
target_file_name.to_s
end
+ # Returns [Pathname] to the archive directory, and creates it if it does not exist.
+ #
+ # If `archive_directory` is a relative path, it is appended to the `file_pathname`.
+ # If `archive_directory` is an absolute path, it is returned as-is.
+ def archive_pathname(file_pathname)
+ path = Pathname.new(archive_directory)
+ path = file_pathname.dirname.join(archive_directory) if path.relative?
+
+ begin
+ path.mkpath unless path.exist?
+ rescue Errno::ENOENT => exc
+ raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}")
+ end
+ path.realpath
+ end
+
+ # Validates job_class is a Rocket Job
+ def job_is_a_rocket_job
+ klass = job_class
+ return if job_class_name.nil? || klass&.ancestors&.include?(RocketJob::Job)
+ errors.add(:job_class_name, "Job #{job_class_name} must be defined and inherit from RocketJob::Job")
+ end
+
+ # Does the job have all the supplied properties
+ def job_has_properties
+ klass = job_class
+ return unless klass
+
+ properties.each_pair do |k, _v|
+ next if klass.public_method_defined?("#{k}=".to_sym)
+ errors.add(:properties, "Unknown Property: Attempted to set a value for #{k.inspect} which is not allowed on the job #{job_class_name}")
+ end
+ end
end
end