lib/rocket_job/job.rb in rocketjob-0.8.0 vs lib/rocket_job/job.rb in rocketjob-0.9.0
- old
+ new
@@ -92,12 +92,12 @@
key :completed_at, Time
# Number of times that this job has failed to process
key :failure_count, Integer, default: 0
- # This name of the server that this job is being processed by, or was processed by
- key :server_name, String
+ # This name of the worker that this job is being processed by, or was processed by
+ key :worker_name, String
#
# Values that jobs can update during processing
#
@@ -194,15 +194,15 @@
drop_index("state_1_priority_1_created_at_1_sub_state_1") rescue nil
# Used by Mission Control
ensure_index [[:created_at, 1]]
end
- # Requeue all jobs for the specified dead server
- def self.requeue_dead_server(server_name)
+ # Requeue all jobs for the specified dead worker
+ def self.requeue_dead_worker(worker_name)
collection.update(
- { 'server_name' => server_name, 'state' => :running },
- { '$unset' => { 'server_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } },
+ { 'worker_name' => worker_name, 'state' => :running },
+ { '$unset' => { 'worker_name' => true, 'started_at' => true }, '$set' => { 'state' => :queued } },
multi: true
)
end
# Pause all running jobs
@@ -218,51 +218,70 @@
# Returns [true|false] whether to collect the results from running this batch
def collect_output?
collect_output == true
end
- # Returns [Time] how long the job has taken to complete
- # If not started then it is the time spent waiting in the queue
- def duration
- seconds = if completed_at
+ # Returns [Float] the number of seconds the job has taken
+ # - Elapsed seconds to process the job from when a worker first started working on it
+ # until now if still running, or until it was completed
+ # - Seconds in the queue if queued
+ def seconds
+ if completed_at
completed_at - (started_at || created_at)
elsif started_at
Time.now - started_at
else
Time.now - created_at
end
- Time.at(seconds)
end
# A job has expired if the expiry time has passed before it is started
def expired?
started_at.nil? && expires_at && (expires_at < Time.now)
end
# Returns [Hash] status of this job
- def status(time_zone='Eastern Time (US & Canada)')
- h = {
- state: state,
- description: description
- }
- h[:started_at] = started_at.in_time_zone(time_zone) if started_at
-
+ def as_json
+ attrs = serializable_hash(methods: :seconds)
+ attrs.delete('result') unless collect_output?
case
- when running? || paused?
- h[:paused_at] = completed_at.in_time_zone(time_zone) if paused?
- h[:percent_complete] = percent_complete if percent_complete
- when completed?
- h[:completed_at] = completed_at.in_time_zone(time_zone)
+ when running?
+ attrs.delete('completed_at')
+ attrs.delete('result')
+ attrs
+ when paused?
+ attrs.delete('completed_at')
+ attrs.delete('result')
+ # Ensure 'paused_at' appears first in the hash
+ { 'paused_at' => completed_at }.merge(attrs)
when aborted?
- h[:aborted_at] = completed_at.in_time_zone(time_zone)
- h[:percent_complete] = percent_complete if percent_complete
+ attrs.delete('completed_at')
+ attrs.delete('result')
+ { 'aborted_at' => completed_at }.merge(attrs)
when failed?
- h[:failed_at] = completed_at.in_time_zone(time_zone)
- h[:percent_complete] = percent_complete if percent_complete
- h[:exception] = exception.nil? ? {} : exception.attributes
+ attrs.delete('completed_at')
+ attrs.delete('result')
+ { 'failed_at' => completed_at }.merge(attrs)
+ else
+ attrs
end
- h[:duration] = duration.strftime('%H:%M:%S')
+ end
+
+ def status(time_zone='Eastern Time (US & Canada)')
+ h = as_json
+ if seconds = h.delete('seconds')
+ h['duration'] = seconds_as_duration(seconds)
+ end
+ h.delete('perform_method') if h['perform_method'] == :perform
+ h.dup.each_pair do |k,v|
+ case
+ when v.kind_of?(Time)
+ h[k] = v.in_time_zone(time_zone).to_s
+ when v.kind_of?(BSON::ObjectId)
+ h[k] = v.to_s
+ end
+ end
h
end
# TODO Jobs are not currently automatically retried. Is there a need?
def seconds_to_delay(count)
@@ -286,10 +305,11 @@
def load_from_database(*args)
super
self.arguments = arguments.collect {|i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } if arguments.present?
end
+ ############################################################################
protected
# Before events that can be overridden by child classes
def before_start
self.started_at = Time.now
@@ -318,27 +338,38 @@
def before_abort
self.completed_at = Time.now
end
- ############################################################################
- protected
+ # Returns a human readable duration from the supplied [Float] number of seconds
+ def seconds_as_duration(seconds)
+ time = Time.at(seconds)
+ if seconds >= 1.day
+ "#{seconds / 1.day}d #{time.strftime('%-Hh %-Mm %-Ss')}"
+ elsif seconds >= 1.hour
+ time.strftime('%-Hh %-Mm %-Ss')
+ elsif seconds >= 1.minute
+ time.strftime('%-Mm %-Ss')
+ else
+ time.strftime('%-Ss')
+ end
+ end
# Returns the next job to work on in priority based order
# Returns nil if there are currently no queued jobs, or processing batch jobs
# with records that require processing
#
# Parameters
- # server_name [String]
- # Name of the server that will be processing this job
+ # worker_name [String]
+ # Name of the worker that will be processing this job
#
# skip_job_ids [Array<BSON::ObjectId>]
# Job ids to exclude when looking for the next job
#
# Note:
# If a job is in queued state it will be started
- def self.next_job(server_name, skip_job_ids = nil)
+ def self.next_job(worker_name, skip_job_ids = nil)
query = {
'$and' => [
{
'$or' => [
{ 'state' => 'queued' }, # Jobs
@@ -356,11 +387,11 @@
query['_id'] = { '$nin' => skip_job_ids } if skip_job_ids && skip_job_ids.size > 0
while doc = find_and_modify(
query: query,
sort: [['priority', 'asc'], ['created_at', 'asc']],
- update: { '$set' => { 'server_name' => server_name, 'state' => 'running' } }
+ update: { '$set' => { 'worker_name' => worker_name, 'state' => 'running' } }
)
job = load(doc)
if job.running?
return job
else
@@ -379,14 +410,14 @@
############################################################################
private
# Set exception information for this job
- def set_exception(server_name, exc)
- self.server_name = nil
+ def set_exception(worker_name, exc)
+ self.worker_name = nil
self.failure_count += 1
self.exception = JobException.from_exception(exc)
- exception.server_name = server_name
+ exception.worker_name = worker_name
fail! unless failed?
logger.error("Exception running #{self.class.name}##{perform_method}", exc)
end
# Calls a method on this job, if it is defined