lib/rocket_job/dirmon_entry.rb in rocketjob-1.3.0 vs lib/rocket_job/dirmon_entry.rb in rocketjob-2.0.0.rc1

- old
+ new

@@ -1,13 +1,12 @@ -require 'thread_safe' +require 'concurrent' require 'pathname' require 'fileutils' -require 'aasm' module RocketJob class DirmonEntry - include MongoMapper::Document - include AASM + include Plugins::Document + include Plugins::StateMachine # @formatter:off # User defined name used to identify this DirmonEntry in Mission Control key :name, String @@ -65,13 +64,10 @@ # If supplied, the file will be moved to this directory before the job is started # If the file was in a sub-directory, the corresponding sub-directory will # be created in the archive directory. key :archive_directory, String - # Method to perform on the job, usually :perform - key :perform_method, Symbol, default: :perform - # If this DirmonEntry is in the failed state, exception contains the cause one :exception, class_name: 'RocketJob::JobException' # The maximum number of files that should ever match during a single poll of the pattern. # @@ -83,11 +79,11 @@ # # Read-only attributes # - # Current state, as set by AASM + # Current state, as set by the state machine. Do not modify directly. key :state, Symbol, default: :pending # State Machine events and transitions # # :pending -> :enabled -> :disabled @@ -118,24 +114,18 @@ event :disable do transitions from: :enabled, to: :disabled transitions from: :failed, to: :disabled end - event :fail do + event :fail, before: :set_exception do transitions from: :enabled, to: :failed end end # @formatter:on - validates_presence_of :pattern, :job_class_name, :perform_method + validates_presence_of :pattern, :job_class_name - validates_each :perform_method do |record, attr, value| - if (klass = record.job_class) && !klass.instance_methods.include?(value) - record.errors.add(attr, "Method not implemented by #{record.job_class_name}") - end - end - validates_each :job_class_name do |record, attr, value| exists = begin value.nil? ? false : record.job_class.ancestors.include?(RocketJob::Job) rescue NameError @@ -143,12 +133,12 @@ end record.errors.add(attr, 'job_class_name must be defined and must be derived from RocketJob::Job') unless exists end validates_each :arguments do |record, attr, value| - if (klass = record.job_class) && klass.instance_methods.include?(record.perform_method) - count = klass.argument_count(record.perform_method) + if klass = record.job_class + count = klass.rocket_job_argument_count record.errors.add(attr, "There must be #{count} argument(s)") if value.size != count end end validates_each :properties do |record, attr, value| @@ -204,31 +194,66 @@ @@whitelist_paths.delete(path) @@whitelist_paths.uniq! path end + # Returns [Hash<String:Integer>] of the number of dirmon entries in each state. + # Note: If there are no workers in that particular state then the hash will not have a value for it. + # + # Example dirmon entries in every state: + # RocketJob::DirmonEntry.counts_by_state + # # => { + # :pending => 1, + # :enabled => 37, + # :failed => 1, + # :disabled => 3 + # } + # + # Example no dirmon entries: + # RocketJob::Job.counts_by_state + # # => {} + def self.counts_by_state + counts = {} + collection.aggregate([ + { + '$group' => { + _id: '$state', + count: {'$sum' => 1} + } + } + ] + ).each do |result| + counts[result['_id']] = result['count'] + end + counts + end + # The default archive directory that is used when the job being queued does not respond - # to #file_store_upload or #upload, and do not have an `archive_directory` specified in this entry + # to #upload, and does not have an `archive_directory` specified in this entry cattr_accessor :default_archive_directory @@default_archive_directory = '_archive'.freeze # Returns [Pathname] the archive_directory if set, otherwise the default_archive_directory # Creates the archive directory if one is set def archive_pathname(file_pathname) if archive_directory path = Pathname.new(archive_directory) - path.mkpath unless path.exist? + begin + path.mkpath unless path.exist? + rescue Errno::ENOENT => exc + raise(Errno::ENOENT, "DirmonJob failed to create archive directory: #{path}, #{exc.message}") + end path.realpath else file_pathname.dirname.join(self.class.default_archive_directory).realdirpath end end # Passes each filename [Pathname] found that matches the pattern into the supplied block def each(&block) - logger.tagged("DirmonEntry:#{id}") do + logger.fast_tag("DirmonEntry:#{id}") do # Case insensitive filename matching Pathname.glob(pattern, File::FNM_CASEFOLD).each do |pathname| next if pathname.directory? pathname = pathname.realpath file_name = pathname.to_s @@ -251,11 +276,11 @@ end end end # Set exception information for this DirmonEntry and fail it - def fail_with_exception!(worker_name, exc_or_message) + def set_exception(worker_name, exc_or_message) if exc_or_message.is_a?(Exception) self.exception = JobException.from_exception(exc_or_message) exception.worker_name = worker_name else build_exception( @@ -263,14 +288,13 @@ message: exc_or_message, backtrace: [], worker_name: worker_name ) end - fail! end - @@whitelist_paths = ThreadSafe::Array.new + @@whitelist_paths = Concurrent::Array.new # Returns the Job to be queued def job_class return if job_class_name.nil? job_class_name.constantize @@ -278,54 +302,48 @@ nil end # Queues the job for the supplied pathname def later(pathname) - job = job_class.new( - properties.merge( - arguments: arguments, - properties: properties, - perform_method: perform_method - ) - ) - upload_file(job, pathname) - job.save! - job + if klass = job_class + job = klass.new(properties.merge(arguments: arguments)) + upload_file(job, pathname) + job.save! + job + else + raise(ArgumentError, "Cannot instantiate a class for: #{job_class_name.inspect}") + end end - protected + private # Instance method to return whitelist paths def whitelist_paths @@whitelist_paths end # Upload the file to the job def upload_file(job, pathname) - if job.respond_to?(:file_store_upload) - # Allow the job to determine what to do with the file - # Pass the pathname as a string, not a Pathname (IO) instance - # so that it can read the file directly - job.file_store_upload(pathname.to_s) - archive_directory ? archive_file(job, pathname) : pathname.unlink - elsif job.respond_to?(:upload) + if job.respond_to?(:upload) # With RocketJob Pro the file can be uploaded directly into the Job itself job.upload(pathname.to_s) archive_directory ? archive_file(job, pathname) : pathname.unlink else upload_default(job, pathname) end end - # Archives the file for a job where there was no #file_store_upload or #upload method + # Archives the file for a job where there was no #upload method def upload_default(job, pathname) full_file_name = archive_file(job, pathname) - if job.respond_to?(:full_file_name=) + if job.respond_to?(:upload_file_name=) + job.upload_file_name = full_file_name + elsif job.respond_to?(:full_file_name=) job.full_file_name = full_file_name elsif job.arguments.first.is_a?(Hash) job.arguments.first[:full_file_name] = full_file_name else - raise(ArgumentError, "#{job_class_name} must either have attribute 'full_file_name' or the first argument must be a Hash") + raise(ArgumentError, "#{job_class_name} must either have attribute 'upload_file_name' or the first argument must be a Hash") end end # Move the file to the archive directory #