lib/rocket_job/job.rb in rocketjob-0.8.0 vs lib/rocket_job/job.rb in rocketjob-0.9.0

- old
+ new

@@ -92,12 +92,12 @@ key :completed_at, Time # Number of times that this job has failed to process key :failure_count, Integer, default: 0 - # This name of the server that this job is being processed by, or was processed by - key :server_name, String + # This name of the worker that this job is being processed by, or was processed by + key :worker_name, String # # Values that jobs can update during processing # @@ -194,15 +194,15 @@ drop_index("state_1_priority_1_created_at_1_sub_state_1") rescue nil # Used by Mission Control ensure_index [[:created_at, 1]] end - # Requeue all jobs for the specified dead server - def self.requeue_dead_server(server_name) + # Requeue all jobs for the specified dead worker + def self.requeue_dead_worker(worker_name) collection.update( - { 'server_name' => server_name, 'state' => :running }, - { '$unset' => { 'server_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } }, + { 'worker_name' => worker_name, 'state' => :running }, + { '$unset' => { 'worker_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } }, multi: true ) end # Pause all running jobs @@ -218,51 +218,70 @@ # Returns [true|false] whether to collect the results from running this batch def collect_output? collect_output == true end - # Returns [Time] how long the job has taken to complete - # If not started then it is the time spent waiting in the queue - def duration - seconds = if completed_at + # Returns [Float] the number of seconds the job has taken + # - Elapsed seconds to process the job from when a worker first started working on it + # until now if still running, or until it was completed + # - Seconds in the queue if queued + def seconds + if completed_at completed_at - (started_at || created_at) elsif started_at Time.now - started_at else Time.now - created_at end - Time.at(seconds) end # A job has expired if the expiry time has passed before it is started def expired? started_at.nil? && expires_at && (expires_at < Time.now) end # Returns [Hash] status of this job - def status(time_zone='Eastern Time (US & Canada)') - h = { - state: state, - description: description - } - h[:started_at] = started_at.in_time_zone(time_zone) if started_at - + def as_json + attrs = serializable_hash(methods: :seconds) + attrs.delete('result') unless collect_output? case - when running? || paused? - h[:paused_at] = completed_at.in_time_zone(time_zone) if paused? - h[:percent_complete] = percent_complete if percent_complete - when completed? - h[:completed_at] = completed_at.in_time_zone(time_zone) + when running? + attrs.delete('completed_at') + attrs.delete('result') + attrs + when paused? + attrs.delete('completed_at') + attrs.delete('result') + # Ensure 'paused_at' appears first in the hash + { 'paused_at' => completed_at }.merge(attrs) when aborted? - h[:aborted_at] = completed_at.in_time_zone(time_zone) - h[:percent_complete] = percent_complete if percent_complete + attrs.delete('completed_at') + attrs.delete('result') + { 'aborted_at' => completed_at }.merge(attrs) when failed? - h[:failed_at] = completed_at.in_time_zone(time_zone) - h[:percent_complete] = percent_complete if percent_complete - h[:exception] = exception.nil? ? {} : exception.attributes + attrs.delete('completed_at') + attrs.delete('result') + { 'failed_at' => completed_at }.merge(attrs) + else + attrs end - h[:duration] = duration.strftime('%H:%M:%S') + end + + def status(time_zone='Eastern Time (US & Canada)') + h = as_json + if seconds = h.delete('seconds') + h['duration'] = seconds_as_duration(seconds) + end + h.delete('perform_method') if h['perform_method'] == :perform + h.dup.each_pair do |k,v| + case + when v.kind_of?(Time) + h[k] = v.in_time_zone(time_zone).to_s + when v.kind_of?(BSON::ObjectId) + h[k] = v.to_s + end + end h end # TODO Jobs are not currently automatically retried. Is there a need? def seconds_to_delay(count) @@ -286,10 +305,11 @@ def load_from_database(*args) super self.arguments = arguments.collect {|i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } if arguments.present? end + ############################################################################ protected # Before events that can be overridden by child classes def before_start self.started_at = Time.now @@ -318,27 +338,38 @@ def before_abort self.completed_at = Time.now end - ############################################################################ - protected + # Returns a human readable duration from the supplied [Float] number of seconds + def seconds_as_duration(seconds) + time = Time.at(seconds) + if seconds >= 1.day + "#{seconds / 1.day}d #{time.strftime('%-Hh %-Mm %-Ss')}" + elsif seconds >= 1.hour + time.strftime('%-Hh %-Mm %-Ss') + elsif seconds >= 1.minute + time.strftime('%-Mm %-Ss') + else + time.strftime('%-Ss') + end + end # Returns the next job to work on in priority based order # Returns nil if there are currently no queued jobs, or processing batch jobs # with records that require processing # # Parameters - # server_name [String] - # Name of the server that will be processing this job + # worker_name [String] + # Name of the worker that will be processing this job # # skip_job_ids [Array<BSON::ObjectId>] # Job ids to exclude when looking for the next job # # Note: # If a job is in queued state it will be started - def self.next_job(server_name, skip_job_ids = nil) + def self.next_job(worker_name, skip_job_ids = nil) query = { '$and' => [ { '$or' => [ { 'state' => 'queued' }, # Jobs @@ -356,11 +387,11 @@ query['_id'] = { '$nin' => skip_job_ids } if skip_job_ids && skip_job_ids.size > 0 while doc = find_and_modify( query: query, sort: [['priority', 'asc'], ['created_at', 'asc']], - update: { '$set' => { 'server_name' => server_name, 'state' => 'running' } } + update: { '$set' => { 'worker_name' => worker_name, 'state' => 'running' } } ) job = load(doc) if job.running? return job else @@ -379,14 +410,14 @@ ############################################################################ private # Set exception information for this job - def set_exception(server_name, exc) - self.server_name = nil + def set_exception(worker_name, exc) + self.worker_name = nil self.failure_count += 1 self.exception = JobException.from_exception(exc) - exception.server_name = server_name + exception.worker_name = worker_name fail! unless failed? logger.error("Exception running #{self.class.name}##{perform_method}", exc) end # Calls a method on this job, if it is defined