lib/rocket_job/job.rb in rocketjob-1.0.0 vs lib/rocket_job/job.rb in rocketjob-1.1.0

- old
+ new

@@ -13,10 +13,11 @@ # # User definable attributes # # The following attributes are set when the job is created + # @formatter:off # Description for this job instance key :description, String # Method that must be invoked to complete this job @@ -126,10 +127,11 @@ # -> :paused -> :running # -> :aborted # -> :failed -> :running # -> :aborted # -> :aborted + # -> :queued (when a worker dies) # -> :aborted aasm column: :state do # Job has been created and is queued for processing ( Initial state ) state :queued, initial: true @@ -165,11 +167,11 @@ transitions from: :running, to: :failed transitions from: :paused, to: :failed end event :retry, before: :before_retry do - transitions from: :failed, to: :running + transitions from: :failed, to: :queued end event :pause, before: :before_pause do transitions from: :running, to: :paused end @@ -182,43 +184,44 @@ transitions from: :running, to: :aborted transitions from: :queued, to: :aborted transitions from: :failed, to: :aborted transitions from: :paused, to: :aborted end + + event :requeue, before: :before_requeue do + transitions from: :running, to: :queued + end end + # @formatter:on # Create indexes def self.create_indexes # Used by find_and_modify in .next_job - ensure_index({ state:1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true) + ensure_index({state: 1, run_at: 1, priority: 1, created_at: 1, sub_state: 1}, background: true) # Remove outdated index if present - drop_index("state_1_priority_1_created_at_1_sub_state_1") rescue nil + drop_index('state_1_priority_1_created_at_1_sub_state_1') rescue nil # Used by Mission Control ensure_index [[:created_at, 1]] end - # Requeue all jobs for the specified dead worker + # Requeues all jobs that were running on worker that died def self.requeue_dead_worker(worker_name) - collection.update( - { 'worker_name' => worker_name, 'state' => :running }, - { '$unset' => { 'worker_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } }, - multi: true - ) + running.each { |job| job.requeue!(worker_name) } end # Pause all running jobs def self.pause_all - where(state: 'running').each { |job| job.pause! } + running.each(&:pause!) end # Resume all paused jobs def self.resume_all - where(state: 'paused').each { |job| job.resume! } + paused.each(&:resume!) end # Returns the number of required arguments for this job - def self.argument_count(method=:perform) + def self.argument_count(method = :perform) instance_method(method).arity end # Returns [true|false] whether to collect the results from running this batch def collect_output? @@ -239,11 +242,11 @@ end end # Returns a human readable duration the job has taken def duration - seconds_as_duration(seconds) + RocketJob.seconds_as_duration(seconds) end # A job has expired if the expiry time has passed before it is started def expired? started_at.nil? && expires_at && (expires_at < Time.now) @@ -260,63 +263,100 @@ attrs when paused? attrs.delete('completed_at') attrs.delete('result') # Ensure 'paused_at' appears first in the hash - { 'paused_at' => completed_at }.merge(attrs) + {'paused_at' => completed_at}.merge(attrs) when aborted? attrs.delete('completed_at') attrs.delete('result') - { 'aborted_at' => completed_at }.merge(attrs) + {'aborted_at' => completed_at}.merge(attrs) when failed? attrs.delete('completed_at') attrs.delete('result') - { 'failed_at' => completed_at }.merge(attrs) + {'failed_at' => completed_at}.merge(attrs) else attrs end end - def status(time_zone='Eastern Time (US & Canada)') + def status(time_zone = 'Eastern Time (US & Canada)') h = as_json h.delete('seconds') h.delete('perform_method') if h['perform_method'] == :perform - h.dup.each_pair do |k,v| + h.dup.each_pair do |k, v| case - when v.kind_of?(Time) + when v.is_a?(Time) h[k] = v.in_time_zone(time_zone).to_s - when v.kind_of?(BSON::ObjectId) + when v.is_a?(BSON::ObjectId) h[k] = v.to_s end end h end - # TODO Jobs are not currently automatically retried. Is there a need? - def seconds_to_delay(count) - # TODO Consider lowering the priority automatically after every retry? - # Same basic formula for calculating retry interval as delayed_job and Sidekiq - (count ** 4) + 15 + (rand(30)*(count+1)) - end - # Patch the way MongoMapper reloads a model # Only reload MongoMapper attributes, leaving other instance variables untouched def reload - if doc = collection.find_one(:_id => id) + if (doc = collection.find_one(_id: id)) + # Clear out keys that are not returned during the reload from MongoDB + (keys.keys - doc.keys).each { |key| send("#{key}=", nil) } + initialize_default_values load_from_database(doc) self else - raise MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection" + if destroy_on_complete + self.state = :completed + before_complete + else + raise(MongoMapper::DocumentNotFound, "Document match #{_id.inspect} does not exist in #{collection.name} collection") + end end end # After this model is read, convert any hashes in the arguments list to HashWithIndifferentAccess def load_from_database(*args) super - self.arguments = arguments.collect {|i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } if arguments.present? + if arguments.present? + self.arguments = arguments.collect { |i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } + end end + # Set exception information for this job and fail it + def fail!(worker_name='user', exc_or_message='Job failed through user action') + if exc_or_message.is_a?(Exception) + self.exception = JobException.from_exception(exc_or_message) + exception.worker_name = worker_name + else + build_exception( + class_name: 'RocketJob::JobException', + message: exc_or_message, + backtrace: [], + worker_name: worker_name + ) + end + # not available as #super + aasm.current_event = :fail! + aasm_fire_event(:fail, persist: true) + end + + # Requeue this running job since the worker assigned to it has died + def requeue!(worker_name_=nil) + return false if worker_name_ && (worker_name != worker_name_) + # not available as #super + aasm.current_event = :requeue! + aasm_fire_event(:requeue, persist: true) + end + + # Requeue this running job since the worker assigned to it has died + def requeue(worker_name_=nil) + return false if worker_name_ && (worker_name != worker_name_) + # not available as #super + aasm.current_event = :requeue + aasm_fire_event(:requeue, persist: false) + end + ############################################################################ protected # Before events that can be overridden by child classes def before_start @@ -328,152 +368,35 @@ self.completed_at = Time.now self.worker_name = nil end def before_fail - self.completed_at = Time.now - self.worker_name = nil + self.completed_at = Time.now + self.worker_name = nil + self.failure_count += 1 end def before_retry self.completed_at = nil end def before_pause self.completed_at = Time.now - self.worker_name = nil + self.worker_name = nil end def before_resume self.completed_at = nil end def before_abort self.completed_at = Time.now - self.worker_name = nil + self.worker_name = nil end - # Returns a human readable duration from the supplied [Float] number of seconds - def seconds_as_duration(seconds) - time = Time.at(seconds) - if seconds >= 1.day - "#{(seconds / 1.day).to_i}d #{time.strftime('%-Hh %-Mm %-Ss')}" - elsif seconds >= 1.hour - time.strftime('%-Hh %-Mm %-Ss') - elsif seconds >= 1.minute - time.strftime('%-Mm %-Ss') - else - time.strftime('%-Ss') - end - end - - # Returns the next job to work on in priority based order - # Returns nil if there are currently no queued jobs, or processing batch jobs - # with records that require processing - # - # Parameters - # worker_name [String] - # Name of the worker that will be processing this job - # - # skip_job_ids [Array<BSON::ObjectId>] - # Job ids to exclude when looking for the next job - # - # Note: - # If a job is in queued state it will be started - def self.next_job(worker_name, skip_job_ids = nil) - query = { - '$and' => [ - { - '$or' => [ - { 'state' => 'queued' }, # Jobs - { 'state' => 'running', 'sub_state' => :processing } # Slices - ] - }, - { - '$or' => [ - { run_at: { '$exists' => false } }, - { run_at: { '$lte' => Time.now } } - ] - }, - ] - } - query['_id'] = { '$nin' => skip_job_ids } if skip_job_ids && skip_job_ids.size > 0 - - while doc = find_and_modify( - query: query, - sort: [['priority', 'asc'], ['created_at', 'asc']], - update: { '$set' => { 'worker_name' => worker_name, 'state' => 'running' } } - ) - job = load(doc) - if job.running? - return job - else - if job.expired? - job.destroy - logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" - else - # Also update in-memory state and run call-backs - job.start - job.set(started_at: job.started_at) - return job - end - end - end - end - - ############################################################################ - private - - # Set exception information for this job - def set_exception(worker_name, exc) + def before_requeue + self.started_at = nil self.worker_name = nil - self.failure_count += 1 - self.exception = JobException.from_exception(exc) - exception.worker_name = worker_name - fail! unless failed? - logger.error("Exception running #{self.class.name}##{perform_method}", exc) - end - - # Calls a method on this job, if it is defined - # Adds the event name to the method call if supplied - # - # Returns [Object] the result of calling the method - # - # Parameters - # method [Symbol] - # The method to call on this job - # - # arguments [Array] - # Arguments to pass to the method call - # - # Options: - # event: [Symbol] - # Any one of: :before, :after - # Default: None, just calls the method itself - # - # log_level: [Symbol] - # Log level to apply to silence logging during the call - # Default: nil ( no change ) - # - def call_method(method, arguments, options={}) - options = options.dup - event = options.delete(:event) - log_level = options.delete(:log_level) - raise(ArgumentError, "Unknown #{self.class.name}#call_method options: #{options.inspect}") if options.size > 0 - - the_method = event.nil? ? method : "#{event}_#{method}".to_sym - if respond_to?(the_method) - method_name = "#{self.class.name}##{the_method}" - logger.info "Start #{method_name}" - logger.benchmark_info("Completed #{method_name}", - metric: "rocketjob/#{self.class.name.underscore}/#{the_method}", - log_exception: :full, - on_exception_level: :error, - silence: log_level - ) do - self.send(the_method, *arguments) - end - end end end end