lib/rocket_job/dirmon_entry.rb in rocketjob-1.3.0 vs lib/rocket_job/dirmon_entry.rb in rocketjob-2.0.0.rc1
- old
+ new
@@ -1,13 +1,12 @@
-require 'thread_safe'
+require 'concurrent'
require 'pathname'
require 'fileutils'
-require 'aasm'
module RocketJob
class DirmonEntry
- include MongoMapper::Document
- include AASM
+ include Plugins::Document
+ include Plugins::StateMachine
# @formatter:off
# User defined name used to identify this DirmonEntry in Mission Control
key :name, String
@@ -65,13 +64,10 @@
# If supplied, the file will be moved to this directory before the job is started
# If the file was in a sub-directory, the corresponding sub-directory will
# be created in the archive directory.
key :archive_directory, String
- # Method to perform on the job, usually :perform
- key :perform_method, Symbol, default: :perform
-
# If this DirmonEntry is in the failed state, exception contains the cause
one :exception, class_name: 'RocketJob::JobException'
# The maximum number of files that should ever match during a single poll of the pattern.
#
@@ -83,11 +79,11 @@
#
# Read-only attributes
#
- # Current state, as set by AASM
+ # Current state, as set by the state machine. Do not modify directly.
key :state, Symbol, default: :pending
# State Machine events and transitions
#
# :pending -> :enabled -> :disabled
@@ -118,24 +114,18 @@
event :disable do
transitions from: :enabled, to: :disabled
transitions from: :failed, to: :disabled
end
- event :fail do
+ event :fail, before: :set_exception do
transitions from: :enabled, to: :failed
end
end
# @formatter:on
- validates_presence_of :pattern, :job_class_name, :perform_method
+ validates_presence_of :pattern, :job_class_name
- validates_each :perform_method do |record, attr, value|
- if (klass = record.job_class) && !klass.instance_methods.include?(value)
- record.errors.add(attr, "Method not implemented by #{record.job_class_name}")
- end
- end
-
validates_each :job_class_name do |record, attr, value|
exists =
begin
value.nil? ? false : record.job_class.ancestors.include?(RocketJob::Job)
rescue NameError
@@ -143,12 +133,12 @@
end
record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists
end
validates_each :arguments do |record, attr, value|
- if (klass = record.job_class) && klass.instance_methods.include?(record.perform_method)
- count = klass.argument_count(record.perform_method)
+ if klass = record.job_class
+ count = klass.rocket_job_argument_count
record.errors.add(attr, "There must be #{count} argument(s)") if value.size != count
end
end
validates_each :properties do |record, attr, value|
@@ -204,31 +194,66 @@
@@whitelist_paths.delete(path)
@@whitelist_paths.uniq!
path
end
+ # Returns [Hash<String:Integer>] of the number of dirmon entries in each state.
+ # Note: If there are no workers in that particular state then the hash will not have a value for it.
+ #
+ # Example dirmon entries in every state:
+ # RocketJob::DirmonEntry.counts_by_state
+ # # => {
+ # :pending => 1,
+ # :enabled => 37,
+ # :failed => 1,
+ # :disabled => 3
+ # }
+ #
+ # Example no dirmon entries:
+ # RocketJob::Job.counts_by_state
+ # # => {}
+ def self.counts_by_state
+ counts = {}
+ collection.aggregate([
+ {
+ '$group' => {
+ _id: '$state',
+ count: {'$sum' => 1}
+ }
+ }
+ ]
+ ).each do |result|
+ counts[result['_id']] = result['count']
+ end
+ counts
+ end
+
# The default archive directory that is used when the job being queued does not respond
- # to #file_store_upload or #upload, and do not have an `archive_directory` specified in this entry
+ # to #upload, and does not have an `archive_directory` specified in this entry
cattr_accessor :default_archive_directory
@@default_archive_directory = '_archive'.freeze
# Returns [Pathname] the archive_directory if set, otherwise the default_archive_directory
# Creates the archive directory if one is set
def archive_pathname(file_pathname)
if archive_directory
path = Pathname.new(archive_directory)
- path.mkpath unless path.exist?
+ begin
+ path.mkpath unless path.exist?
+ rescue Errno::ENOENT => exc
+ raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}")
+ end
path.realpath
else
file_pathname.dirname.join(self.class.default_archive_directory).realdirpath
end
end
# Passes each filename [Pathname] found that matches the pattern into the supplied block
def each(&block)
- logger.tagged("DirmonEntry:#{id}") do
+ logger.fast_tag("DirmonEntry:#{id}") do
# Case insensitive filename matching
Pathname.glob(pattern, File::FNM_CASEFOLD).each do |pathname|
next if pathname.directory?
pathname = pathname.realpath
file_name = pathname.to_s
@@ -251,11 +276,11 @@
end
end
end
# Set exception information for this DirmonEntry and fail it
- def fail_with_exception!(worker_name, exc_or_message)
+ def set_exception(worker_name, exc_or_message)
if exc_or_message.is_a?(Exception)
self.exception = JobException.from_exception(exc_or_message)
exception.worker_name = worker_name
else
build_exception(
@@ -263,14 +288,13 @@
message: exc_or_message,
backtrace: [],
worker_name: worker_name
)
end
- fail!
end
- @@whitelist_paths = ThreadSafe::Array.new
+ @@whitelist_paths = Concurrent::Array.new
# Returns the Job to be queued
def job_class
return if job_class_name.nil?
job_class_name.constantize
@@ -278,54 +302,48 @@
nil
end
# Queues the job for the supplied pathname
def later(pathname)
- job = job_class.new(
- properties.merge(
- arguments: arguments,
- properties: properties,
- perform_method: perform_method
- )
- )
- upload_file(job, pathname)
- job.save!
- job
+ if klass = job_class
+ job = klass.new(properties.merge(arguments: arguments))
+ upload_file(job, pathname)
+ job.save!
+ job
+ else
+ raise(ArgumentError, "Cannot instantiate a class for: #{job_class_name.inspect}")
+ end
end
- protected
+ private
# Instance method to return whitelist paths
def whitelist_paths
@@whitelist_paths
end
# Upload the file to the job
def upload_file(job, pathname)
- if job.respond_to?(:file_store_upload)
- # Allow the job to determine what to do with the file
- # Pass the pathname as a string, not a Pathname (IO) instance
- # so that it can read the file directly
- job.file_store_upload(pathname.to_s)
- archive_directory ? archive_file(job, pathname) : pathname.unlink
- elsif job.respond_to?(:upload)
+ if job.respond_to?(:upload)
# With RocketJob Pro the file can be uploaded directly into the Job itself
job.upload(pathname.to_s)
archive_directory ? archive_file(job, pathname) : pathname.unlink
else
upload_default(job, pathname)
end
end
- # Archives the file for a job where there was no #file_store_upload or #upload method
+ # Archives the file for a job where there was no #upload method
def upload_default(job, pathname)
full_file_name = archive_file(job, pathname)
- if job.respond_to?(:full_file_name=)
+ if job.respond_to?(:upload_file_name=)
+ job.upload_file_name = full_file_name
+ elsif job.respond_to?(:full_file_name=)
job.full_file_name = full_file_name
elsif job.arguments.first.is_a?(Hash)
job.arguments.first[:full_file_name] = full_file_name
else
- raise(ArgumentError, "#{job_class_name} must either have attribute 'full_file_name' or the first argument must be a Hash")
+ raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or the first argument must be a Hash")
end
end
# Move the file to the archive directory
#