lib/rocket_job/job.rb in rocketjob-0.7.0 vs lib/rocket_job/job.rb in rocketjob-0.8.0
- old
+ new
@@ -21,33 +21,33 @@
# Method that must be invoked to complete this job
key :perform_method, Symbol, default: :perform
# Priority of this job as it relates to other jobs [1..100]
- # 1: Lowest Priority
- # 100: Highest Priority
+ # 1: Highest Priority
# 50: Default Priority
+ # 100: Lowest Priority
+ #
+ # Example:
+ # A job with a priority of 40 will execute before a job with priority 50
+ #
+ # In RocketJob Pro, if a SlicedJob is running and a higher priority job
+ # arrives, then the current job will complete the current slices and process
+ # the new higher priority job
key :priority, Integer, default: 50
- # Support running this job in the future
- # Also set when a job fails and needs to be re-tried in the future
+ # Run this job no earlier than this time
key :run_at, Time
# If a job has not started by this time, destroy it
key :expires_at, Time
# When specified a job will be re-scheduled to run at it's next scheduled interval
- # Format is the same as cron
- key :schedule, String
+ # Format is the same as cron.
+ # #TODO Future capability.
+ #key :schedule, String
- # Job should be marked as repeatable when it can be run multiple times
- # without changing the system state or modifying database contents.
- # Setting to false will result in an additional lookup on the results collection
- # before processing the record to ensure it was not previously processed.
- # This is necessary for retrying a job.
- key :repeatable, Boolean, default: true
-
# When the job completes destroy it from both the database and the UI
key :destroy_on_complete, Boolean, default: true
# Any user supplied arguments for the method invocation
# All keys must be UTF-8 strings. The values can be any valid BSON type:
@@ -73,13 +73,10 @@
# Can be used to reduce log noise, especially during high volume calls
# For debugging a single job can be logged at a low level such as :trace
# Levels supported: :trace, :debug, :info, :warn, :error, :fatal
key :log_level, Symbol
- # Only give access through the Web UI to this group identifier
- #key :group, String
-
#
# Read-only attributes
#
# Current state, as set by AASM
@@ -119,34 +116,21 @@
# Store all job types in this collection
set_collection_name 'rocket_job.jobs'
validates_presence_of :state, :failure_count, :created_at, :perform_method
- # :repeatable, :destroy_on_complete, :collect_output, :arguments
validates :priority, inclusion: 1..100
# State Machine events and transitions
#
- # For Job Record jobs, usual processing:
# :queued -> :running -> :completed
- # -> :paused -> :running ( manual )
- # -> :failed -> :running ( manual )
- # -> :retry -> :running ( future date )
- #
- # Any state other than :completed can transition manually to :aborted
- #
- # Work queue is priority based and then FIFO thereafter
- # means that records from existing multi-record jobs will be completed before
- # new jobs are started with the same priority.
- # Unless, the loader is not fast enough and the
- # records queue is empty. In this case the next multi-record job will
- # start loading too.
- #
- # Where: state: [:queued, :running], run_at: $lte: Time.now
- # Sort: priority, created_at
- #
- # Index: state, run_at
+ # -> :paused -> :running
+ # -> :aborted
+ # -> :failed -> :running
+ # -> :aborted
+ # -> :aborted
+ # -> :aborted
aasm column: :state do
# Job has been created and is queued for processing ( Initial state )
state :queued, initial: true
# Job is running
@@ -160,14 +144,10 @@
state :paused
# Job failed to process and needs to be manually re-tried or aborted
state :failed
- # Job failed to process previously and is scheduled to be retried at a
- # future date
- state :retry
-
# Job was aborted and cannot be resumed ( End state )
state :aborted
event :start, before: :before_start do
transitions from: :queued, to: :running
@@ -251,10 +231,15 @@
Time.now - created_at
end
Time.at(seconds)
end
+ # A job has expired if the expiry time has passed before it is started
+ def expired?
+ started_at.nil? && expires_at && (expires_at < Time.now)
+ end
+
# Returns [Hash] status of this job
def status(time_zone='Eastern Time (US & Canada)')
h = {
state: state,
description: description
@@ -277,17 +262,19 @@
end
h[:duration] = duration.strftime('%H:%M:%S')
h
end
- # Same basic formula for calculating retry interval as delayed_job and Sidekiq
- # TODO Consider lowering the priority automatically after every retry?
+ # TODO Jobs are not currently automatically retried. Is there a need?
def seconds_to_delay(count)
+ # TODO Consider lowering the priority automatically after every retry?
+ # Same basic formula for calculating retry interval as delayed_job and Sidekiq
(count ** 4) + 15 + (rand(30)*(count+1))
end
# Patch the way MongoMapper reloads a model
+ # Only reload MongoMapper attributes, leaving other instance variables untouched
def reload
if doc = collection.find_one(:_id => id)
load_from_database(doc)
self
else
@@ -343,11 +330,11 @@
# Parameters
# server_name [String]
# Name of the server that will be processing this job
#
# skip_job_ids [Array<BSON::ObjectId>]
- # Job ids to exclude when looking for 3the next job
+ # Job ids to exclude when looking for the next job
#
# Note:
# If a job is in queued state it will be started
def self.next_job(server_name, skip_job_ids = nil)
query = {
@@ -366,21 +353,28 @@
},
]
}
query['_id'] = { '$nin' => skip_job_ids } if skip_job_ids && skip_job_ids.size > 0
- if doc = find_and_modify(
+ while doc = find_and_modify(
query: query,
sort: [['priority', 'asc'], ['created_at', 'asc']],
update: { '$set' => { 'server_name' => server_name, 'state' => 'running' } }
)
job = load(doc)
- unless job.running?
- # Also update in-memory state and run call-backs
- job.start
- job.set(started_at: job.started_at)
+ if job.running?
+ return job
+ else
+ if job.expired?
+ job.destroy
+ logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
+ else
+ # Also update in-memory state and run call-backs
+ job.start
+ job.set(started_at: job.started_at)
+ return job
+ end
end
- job
end
end
############################################################################
private