lib/rocket_job/job.rb in rocketjob-1.3.0 vs lib/rocket_job/job.rb in rocketjob-2.0.0.rc1
- old
+ new
@@ -1,440 +1,16 @@
# encoding: UTF-8
-require 'aasm'
module RocketJob
# The base job from which all jobs are created
class Job
- include MongoMapper::Document
- include AASM
include SemanticLogger::Loggable
- include Concerns::Worker
-
- # Prevent data in MongoDB from re-defining the model behavior
- #self.static_keys = true
-
- #
- # User definable attributes
- #
- # The following attributes are set when the job is created
- # @formatter:off
-
- # Description for this job instance
- key :description, String
-
- # Method that must be invoked to complete this job
- key :perform_method, Symbol, default: :perform
-
- # Priority of this job as it relates to other jobs [1..100]
- # 1: Highest Priority
- # 50: Default Priority
- # 100: Lowest Priority
- #
- # Example:
- # A job with a priority of 40 will execute before a job with priority 50
- #
- # In RocketJob Pro, if a SlicedJob is running and a higher priority job
- # arrives, then the current job will complete the current slices and process
- # the new higher priority job
- key :priority, Integer, default: 50
-
- # Run this job no earlier than this time
- key :run_at, Time
-
- # If a job has not started by this time, destroy it
- key :expires_at, Time
-
- # When specified a job will be re-scheduled to run at it's next scheduled interval
- # Format is the same as cron.
- # #TODO Future capability.
- #key :schedule, String
-
- # When the job completes destroy it from both the database and the UI
- key :destroy_on_complete, Boolean, default: true
-
- # Any user supplied arguments for the method invocation
- # All keys must be UTF-8 strings. The values can be any valid BSON type:
- # Integer
- # Float
- # Time (UTC)
- # String (UTF-8)
- # Array
- # Hash
- # True
- # False
- # Symbol
- # nil
- # Regular Expression
- #
- # Note: Date is not supported, convert it to a UTC time
- key :arguments, Array
-
- # Whether to store the results from this job
- key :collect_output, Boolean, default: false
-
- # Raise or lower the log level when calling the job
- # Can be used to reduce log noise, especially during high volume calls
- # For debugging a single job can be logged at a low level such as :trace
- # Levels supported: :trace, :debug, :info, :warn, :error, :fatal
- key :log_level, Symbol
-
- #
- # Read-only attributes
- #
-
- # Current state, as set by AASM
- key :state, Symbol, default: :queued
-
- # When the job was created
- key :created_at, Time, default: -> { Time.now }
-
- # When processing started on this job
- key :started_at, Time
-
- # When the job completed processing
- key :completed_at, Time
-
- # Number of times that this job has failed to process
- key :failure_count, Integer, default: 0
-
- # This name of the worker that this job is being processed by, or was processed by
- key :worker_name, String
-
- #
- # Values that jobs can update during processing
- #
-
- # Allow a job to updates its estimated progress
- # Any integer from 0 to 100
- key :percent_complete, Integer, default: 0
-
- # Store the last exception for this job
- one :exception, class_name: 'RocketJob::JobException'
-
- # Store the Hash result from this job if collect_output is true,
- # and the job returned actually returned a Hash, otherwise nil
- # Not applicable to SlicedJob jobs, since its output is stored in a
- # separate collection
- key :result, Hash
-
- # Store all job types in this collection
- set_collection_name 'rocket_job.jobs'
-
- validates_presence_of :state, :failure_count, :created_at, :perform_method
- validates :priority, inclusion: 1..100
- validates :log_level, inclusion: SemanticLogger::LEVELS + [nil]
-
- # User definable properties in Dirmon Entry
- def self.rocket_job_properties
- @rocket_job_properties ||= (self == RocketJob::Job ? [] : superclass.rocket_job_properties)
- end
-
- # Add to user definable properties in Dirmon Entry
- def self.public_rocket_job_properties(*properties)
- rocket_job_properties.concat(properties).uniq!
- end
-
- # User definable properties in Dirmon Entry
- public_rocket_job_properties :description, :priority, :perform_method, :log_level, :arguments
-
- # State Machine events and transitions
- #
- # :queued -> :running -> :completed
- # -> :paused -> :running
- # -> :aborted
- # -> :failed -> :running
- # -> :aborted
- # -> :aborted
- # -> :queued (when a worker dies)
- # -> :aborted
- aasm column: :state do
- # Job has been created and is queued for processing ( Initial state )
- state :queued, initial: true
-
- # Job is running
- state :running
-
- # Job has completed processing ( End state )
- state :completed
-
- # Job is temporarily paused and no further processing will be completed
- # until this job has been resumed
- state :paused
-
- # Job failed to process and needs to be manually re-tried or aborted
- state :failed
-
- # Job was aborted and cannot be resumed ( End state )
- state :aborted
-
- event :start, before: :before_start do
- transitions from: :queued, to: :running
- end
-
- event :complete, before: :before_complete do
- after do
- destroy if destroy_on_complete
- end
- transitions from: :running, to: :completed
- end
-
- event :fail, before: :before_fail do
- transitions from: :queued, to: :failed
- transitions from: :running, to: :failed
- transitions from: :paused, to: :failed
- end
-
- event :retry, before: :before_retry do
- transitions from: :failed, to: :queued
- end
-
- event :pause, before: :before_pause do
- transitions from: :running, to: :paused
- end
-
- event :resume, before: :before_resume do
- transitions from: :paused, to: :running
- end
-
- event :abort, before: :before_abort do
- transitions from: :running, to: :aborted
- transitions from: :queued, to: :aborted
- transitions from: :failed, to: :aborted
- transitions from: :paused, to: :aborted
- end
-
- event :requeue, before: :before_requeue do
- transitions from: :running, to: :queued
- end
- end
- # @formatter:on
-
- # Create indexes
- def self.create_indexes
- # Used by find_and_modify in .next_job
- ensure_index({state: 1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true)
- # Remove outdated index if present
- drop_index('state_1_priority_1_created_at_1_sub_state_1') rescue nil
- # Used by Mission Control
- ensure_index [[:created_at, 1]]
- end
-
- # Requeues all jobs that were running on worker that died
- def self.requeue_dead_worker(worker_name)
- running.each { |job| job.requeue!(worker_name) }
- end
-
- # Pause all running jobs
- def self.pause_all
- running.each(&:pause!)
- end
-
- # Resume all paused jobs
- def self.resume_all
- paused.each(&:resume!)
- end
-
- # Returns the number of required arguments for this job
- def self.argument_count(method = :perform)
- instance_method(method).arity
- end
-
- # Override parent defaults
- def self.rocket_job(&block)
- @rocket_job_defaults = block
- self
- end
-
- # Returns [true|false] whether to collect the results from running this batch
- def collect_output?
- collect_output == true
- end
-
- # Returns [Float] the number of seconds the job has taken
- # - Elapsed seconds to process the job from when a worker first started working on it
- # until now if still running, or until it was completed
- # - Seconds in the queue if queued
- def seconds
- if completed_at
- completed_at - (started_at || created_at)
- elsif started_at
- Time.now - started_at
- else
- Time.now - created_at
- end
- end
-
- # Returns a human readable duration the job has taken
- def duration
- RocketJob.seconds_as_duration(seconds)
- end
-
- # A job has expired if the expiry time has passed before it is started
- def expired?
- started_at.nil? && expires_at && (expires_at < Time.now)
- end
-
- # Returns [Hash] status of this job
- def as_json
- attrs = serializable_hash(methods: [:seconds, :duration])
- attrs.delete('result') unless collect_output?
- case
- when running?
- attrs.delete('completed_at')
- attrs.delete('result')
- attrs
- when paused?
- attrs.delete('completed_at')
- attrs.delete('result')
- # Ensure 'paused_at' appears first in the hash
- {'paused_at' => completed_at}.merge(attrs)
- when aborted?
- attrs.delete('completed_at')
- attrs.delete('result')
- {'aborted_at' => completed_at}.merge(attrs)
- when failed?
- attrs.delete('completed_at')
- attrs.delete('result')
- {'failed_at' => completed_at}.merge(attrs)
- else
- attrs
- end
- end
-
- def status(time_zone = 'Eastern Time (US & Canada)')
- h = as_json
- h.delete('seconds')
- h.delete('perform_method') if h['perform_method'] == :perform
- h.dup.each_pair do |k, v|
- case
- when v.is_a?(Time)
- h[k] = v.in_time_zone(time_zone).to_s
- when v.is_a?(BSON::ObjectId)
- h[k] = v.to_s
- end
- end
- h
- end
-
- # Patch the way MongoMapper reloads a model
- # Only reload MongoMapper attributes, leaving other instance variables untouched
- def reload
- if (doc = collection.find_one(_id: id))
- # Clear out keys that are not returned during the reload from MongoDB
- (keys.keys - doc.keys).each { |key| send("#{key}=", nil) }
- initialize_default_values
- load_from_database(doc)
- self
- else
- if destroy_on_complete
- self.state = :completed
- before_complete
- else
- raise(MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection")
- end
- end
- end
-
- # Set exception information for this job and fail it
- def fail(worker_name='user', exc_or_message='Job failed through user action')
- if exc_or_message.is_a?(Exception)
- self.exception = JobException.from_exception(exc_or_message)
- exception.worker_name = worker_name
- else
- build_exception(
- class_name: 'RocketJob::JobException',
- message: exc_or_message,
- backtrace: [],
- worker_name: worker_name
- )
- end
- # not available as #super
- aasm.current_event = :fail
- aasm_fire_event(:fail, persist: false)
- end
-
- def fail!(worker_name='user', exc_or_message='Job failed through user action')
- self.fail(worker_name, exc_or_message)
- save!
- end
-
- # Requeue this running job since the worker assigned to it has died
- def requeue!(worker_name_=nil)
- return false if worker_name_ && (worker_name != worker_name_)
- # not available as #super
- aasm.current_event = :requeue!
- aasm_fire_event(:requeue, persist: true)
- end
-
- # Requeue this running job since the worker assigned to it has died
- def requeue(worker_name_=nil)
- return false if worker_name_ && (worker_name != worker_name_)
- # not available as #super
- aasm.current_event = :requeue
- aasm_fire_event(:requeue, persist: false)
- end
-
- protected
-
- # Before events that can be overridden by child classes
- def before_start
- self.started_at = Time.now
- end
-
- def before_complete
- self.percent_complete = 100
- self.completed_at = Time.now
- self.worker_name = nil
- end
-
- def before_fail
- self.completed_at = Time.now
- self.worker_name = nil
- self.failure_count += 1
- end
-
- def before_retry
- self.completed_at = nil
- self.exception = nil
- end
-
- def before_pause
- self.completed_at = Time.now
- self.worker_name = nil
- end
-
- def before_resume
- self.completed_at = nil
- end
-
- def before_abort
- self.completed_at = Time.now
- self.worker_name = nil
- end
-
- def before_requeue
- self.started_at = nil
- self.worker_name = nil
- end
-
- private
-
- # After this model is loaded, convert any hashes in the arguments list to HashWithIndifferentAccess
- def load_from_database(*args)
- super
- if arguments.present?
- self.arguments = arguments.collect { |i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i }
- end
- end
-
- def self.apply_defaults(job)
- @rocket_job_defaults.call(job) if @rocket_job_defaults
- end
-
- # Apply RocketJob defaults after initializing default values
- # but before setting attributes
- def initialize_default_values(except = {})
- super
- self.class.apply_defaults(self)
- end
-
+ include Plugins::Document
+ include Plugins::Job::Model
+ include Plugins::Job::Persistence
+ include Plugins::Job::Callbacks
+ include Plugins::Job::Logger
+ include Plugins::StateMachine
+ include Plugins::Job::StateMachine
+ include Plugins::Job::Worker
+ include Plugins::Job::Defaults
end
end