lib/rocket_job/job.rb in rocketjob-1.0.0 vs lib/rocket_job/job.rb in rocketjob-1.1.0
- old
+ new
@@ -13,10 +13,11 @@
#
# User definable attributes
#
# The following attributes are set when the job is created
+ # @formatter:off
# Description for this job instance
key :description, String
# Method that must be invoked to complete this job
@@ -126,10 +127,11 @@
# -> :paused -> :running
# -> :aborted
# -> :failed -> :running
# -> :aborted
# -> :aborted
+ # -> :queued (when a worker dies)
# -> :aborted
aasm column: :state do
# Job has been created and is queued for processing ( Initial state )
state :queued, initial: true
@@ -165,11 +167,11 @@
transitions from: :running, to: :failed
transitions from: :paused, to: :failed
end
event :retry, before: :before_retry do
- transitions from: :failed, to: :running
+ transitions from: :failed, to: :queued
end
event :pause, before: :before_pause do
transitions from: :running, to: :paused
end
@@ -182,43 +184,44 @@
transitions from: :running, to: :aborted
transitions from: :queued, to: :aborted
transitions from: :failed, to: :aborted
transitions from: :paused, to: :aborted
end
+
+ event :requeue, before: :before_requeue do
+ transitions from: :running, to: :queued
+ end
end
+ # @formatter:on
# Create indexes
def self.create_indexes
# Used by find_and_modify in .next_job
- ensure_index({ state:1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true)
+ ensure_index({state: 1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true)
# Remove outdated index if present
- drop_index("state_1_priority_1_created_at_1_sub_state_1") rescue nil
+ drop_index('state_1_priority_1_created_at_1_sub_state_1') rescue nil
# Used by Mission Control
ensure_index [[:created_at, 1]]
end
- # Requeue all jobs for the specified dead worker
+ # Requeues all jobs that were running on worker that died
def self.requeue_dead_worker(worker_name)
- collection.update(
- { 'worker_name' => worker_name, 'state' => :running },
- { '$unset' => { 'worker_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } },
- multi: true
- )
+ running.each { |job| job.requeue!(worker_name) }
end
# Pause all running jobs
def self.pause_all
- where(state: 'running').each { |job| job.pause! }
+ running.each(&:pause!)
end
# Resume all paused jobs
def self.resume_all
- where(state: 'paused').each { |job| job.resume! }
+ paused.each(&:resume!)
end
# Returns the number of required arguments for this job
- def self.argument_count(method=:perform)
+ def self.argument_count(method = :perform)
instance_method(method).arity
end
# Returns [true|false] whether to collect the results from running this batch
def collect_output?
@@ -239,11 +242,11 @@
end
end
# Returns a human readable duration the job has taken
def duration
- seconds_as_duration(seconds)
+ RocketJob.seconds_as_duration(seconds)
end
# A job has expired if the expiry time has passed before it is started
def expired?
started_at.nil? && expires_at && (expires_at < Time.now)
@@ -260,63 +263,100 @@
attrs
when paused?
attrs.delete('completed_at')
attrs.delete('result')
# Ensure 'paused_at' appears first in the hash
- { 'paused_at' => completed_at }.merge(attrs)
+ {'paused_at' => completed_at}.merge(attrs)
when aborted?
attrs.delete('completed_at')
attrs.delete('result')
- { 'aborted_at' => completed_at }.merge(attrs)
+ {'aborted_at' => completed_at}.merge(attrs)
when failed?
attrs.delete('completed_at')
attrs.delete('result')
- { 'failed_at' => completed_at }.merge(attrs)
+ {'failed_at' => completed_at}.merge(attrs)
else
attrs
end
end
- def status(time_zone='Eastern Time (US & Canada)')
+ def status(time_zone = 'Eastern Time (US & Canada)')
h = as_json
h.delete('seconds')
h.delete('perform_method') if h['perform_method'] == :perform
- h.dup.each_pair do |k,v|
+ h.dup.each_pair do |k, v|
case
- when v.kind_of?(Time)
+ when v.is_a?(Time)
h[k] = v.in_time_zone(time_zone).to_s
- when v.kind_of?(BSON::ObjectId)
+ when v.is_a?(BSON::ObjectId)
h[k] = v.to_s
end
end
h
end
- # TODO Jobs are not currently automatically retried. Is there a need?
- def seconds_to_delay(count)
- # TODO Consider lowering the priority automatically after every retry?
- # Same basic formula for calculating retry interval as delayed_job and Sidekiq
- (count ** 4) + 15 + (rand(30)*(count+1))
- end
-
# Patch the way MongoMapper reloads a model
# Only reload MongoMapper attributes, leaving other instance variables untouched
def reload
- if doc = collection.find_one(:_id => id)
+ if (doc = collection.find_one(_id: id))
+ # Clear out keys that are not returned during the reload from MongoDB
+ (keys.keys - doc.keys).each { |key| send("#{key}=", nil) }
+ initialize_default_values
load_from_database(doc)
self
else
- raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection"
+ if destroy_on_complete
+ self.state = :completed
+ before_complete
+ else
+ raise(MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection")
+ end
end
end
# After this model is read, convert any hashes in the arguments list to HashWithIndifferentAccess
def load_from_database(*args)
super
- self.arguments = arguments.collect {|i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } if arguments.present?
+ if arguments.present?
+ self.arguments = arguments.collect { |i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i }
+ end
end
+ # Set exception information for this job and fail it
+ def fail!(worker_name='user', exc_or_message='Job failed through user action')
+ if exc_or_message.is_a?(Exception)
+ self.exception = JobException.from_exception(exc_or_message)
+ exception.worker_name = worker_name
+ else
+ build_exception(
+ class_name: 'RocketJob::JobException',
+ message: exc_or_message,
+ backtrace: [],
+ worker_name: worker_name
+ )
+ end
+ # not available as #super
+ aasm.current_event = :fail!
+ aasm_fire_event(:fail, persist: true)
+ end
+
+ # Requeue this running job since the worker assigned to it has died
+ def requeue!(worker_name_=nil)
+ return false if worker_name_ && (worker_name != worker_name_)
+ # not available as #super
+ aasm.current_event = :requeue!
+ aasm_fire_event(:requeue, persist: true)
+ end
+
+ # Requeue this running job since the worker assigned to it has died
+ def requeue(worker_name_=nil)
+ return false if worker_name_ && (worker_name != worker_name_)
+ # not available as #super
+ aasm.current_event = :requeue
+ aasm_fire_event(:requeue, persist: false)
+ end
+
############################################################################
protected
# Before events that can be overridden by child classes
def before_start
@@ -328,152 +368,35 @@
self.completed_at = Time.now
self.worker_name = nil
end
def before_fail
- self.completed_at = Time.now
- self.worker_name = nil
+ self.completed_at = Time.now
+ self.worker_name = nil
+ self.failure_count += 1
end
def before_retry
self.completed_at = nil
end
def before_pause
self.completed_at = Time.now
- self.worker_name = nil
+ self.worker_name = nil
end
def before_resume
self.completed_at = nil
end
def before_abort
self.completed_at = Time.now
- self.worker_name = nil
+ self.worker_name = nil
end
- # Returns a human readable duration from the supplied [Float] number of seconds
- def seconds_as_duration(seconds)
- time = Time.at(seconds)
- if seconds >= 1.day
- "#{(seconds / 1.day).to_i}d #{time.strftime('%-Hh %-Mm %-Ss')}"
- elsif seconds >= 1.hour
- time.strftime('%-Hh %-Mm %-Ss')
- elsif seconds >= 1.minute
- time.strftime('%-Mm %-Ss')
- else
- time.strftime('%-Ss')
- end
- end
-
- # Returns the next job to work on in priority based order
- # Returns nil if there are currently no queued jobs, or processing batch jobs
- # with records that require processing
- #
- # Parameters
- # worker_name [String]
- # Name of the worker that will be processing this job
- #
- # skip_job_ids [Array<BSON::ObjectId>]
- # Job ids to exclude when looking for the next job
- #
- # Note:
- # If a job is in queued state it will be started
- def self.next_job(worker_name, skip_job_ids = nil)
- query = {
- '$and' => [
- {
- '$or' => [
- { 'state' => 'queued' }, # Jobs
- { 'state' => 'running', 'sub_state' => :processing } # Slices
- ]
- },
- {
- '$or' => [
- { run_at: { '$exists' => false } },
- { run_at: { '$lte' => Time.now } }
- ]
- },
- ]
- }
- query['_id'] = { '$nin' => skip_job_ids } if skip_job_ids && skip_job_ids.size > 0
-
- while doc = find_and_modify(
- query: query,
- sort: [['priority', 'asc'], ['created_at', 'asc']],
- update: { '$set' => { 'worker_name' => worker_name, 'state' => 'running' } }
- )
- job = load(doc)
- if job.running?
- return job
- else
- if job.expired?
- job.destroy
- logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
- else
- # Also update in-memory state and run call-backs
- job.start
- job.set(started_at: job.started_at)
- return job
- end
- end
- end
- end
-
- ############################################################################
- private
-
- # Set exception information for this job
- def set_exception(worker_name, exc)
+ def before_requeue
+ self.started_at = nil
self.worker_name = nil
- self.failure_count += 1
- self.exception = JobException.from_exception(exc)
- exception.worker_name = worker_name
- fail! unless failed?
- logger.error("Exception running #{self.class.name}##{perform_method}", exc)
- end
-
- # Calls a method on this job, if it is defined
- # Adds the event name to the method call if supplied
- #
- # Returns [Object] the result of calling the method
- #
- # Parameters
- # method [Symbol]
- # The method to call on this job
- #
- # arguments [Array]
- # Arguments to pass to the method call
- #
- # Options:
- # event: [Symbol]
- # Any one of: :before, :after
- # Default: None, just calls the method itself
- #
- # log_level: [Symbol]
- # Log level to apply to silence logging during the call
- # Default: nil ( no change )
- #
- def call_method(method, arguments, options={})
- options = options.dup
- event = options.delete(:event)
- log_level = options.delete(:log_level)
- raise(ArgumentError, "Unknown #{self.class.name}#call_method options: #{options.inspect}") if options.size > 0
-
- the_method = event.nil? ? method : "#{event}_#{method}".to_sym
- if respond_to?(the_method)
- method_name = "#{self.class.name}##{the_method}"
- logger.info "Start #{method_name}"
- logger.benchmark_info("Completed #{method_name}",
- metric: "rocketjob/#{self.class.name.underscore}/#{the_method}",
- log_exception: :full,
- on_exception_level: :error,
- silence: log_level
- ) do
- self.send(the_method, *arguments)
- end
- end
end
end
end