lib/rocket_job/job.rb in rocketjob-0.7.0 vs lib/rocket_job/job.rb in rocketjob-0.8.0

- old
+ new

@@ -21,33 +21,33 @@ # Method that must be invoked to complete this job key :perform_method, Symbol, default: :perform # Priority of this job as it relates to other jobs [1..100] - # 1: Lowest Priority - # 100: Highest Priority + # 1: Highest Priority # 50: Default Priority + # 100: Lowest Priority + # + # Example: + # A job with a priority of 40 will execute before a job with priority 50 + # + # In RocketJob Pro, if a SlicedJob is running and a higher priority job + # arrives, then the current job will complete the current slices and process + # the new higher priority job key :priority, Integer, default: 50 - # Support running this job in the future - # Also set when a job fails and needs to be re-tried in the future + # Run this job no earlier than this time key :run_at, Time # If a job has not started by this time, destroy it key :expires_at, Time # When specified a job will be re-scheduled to run at it's next scheduled interval - # Format is the same as cron - key :schedule, String + # Format is the same as cron. + # #TODO Future capability. + #key :schedule, String - # Job should be marked as repeatable when it can be run multiple times - # without changing the system state or modifying database contents. - # Setting to false will result in an additional lookup on the results collection - # before processing the record to ensure it was not previously processed. - # This is necessary for retrying a job. - key :repeatable, Boolean, default: true - # When the job completes destroy it from both the database and the UI key :destroy_on_complete, Boolean, default: true # Any user supplied arguments for the method invocation # All keys must be UTF-8 strings. The values can be any valid BSON type: @@ -73,13 +73,10 @@ # Can be used to reduce log noise, especially during high volume calls # For debugging a single job can be logged at a low level such as :trace # Levels supported: :trace, :debug, :info, :warn, :error, :fatal key :log_level, Symbol - # Only give access through the Web UI to this group identifier - #key :group, String - # # Read-only attributes # # Current state, as set by AASM @@ -119,34 +116,21 @@ # Store all job types in this collection set_collection_name 'rocket_job.jobs' validates_presence_of :state, :failure_count, :created_at, :perform_method - # :repeatable, :destroy_on_complete, :collect_output, :arguments validates :priority, inclusion: 1..100 # State Machine events and transitions # - # For Job Record jobs, usual processing: # :queued -> :running -> :completed - # -> :paused -> :running ( manual ) - # -> :failed -> :running ( manual ) - # -> :retry -> :running ( future date ) - # - # Any state other than :completed can transition manually to :aborted - # - # Work queue is priority based and then FIFO thereafter - # means that records from existing multi-record jobs will be completed before - # new jobs are started with the same priority. - # Unless, the loader is not fast enough and the - # records queue is empty. In this case the next multi-record job will - # start loading too. - # - # Where: state: [:queued, :running], run_at: $lte: Time.now - # Sort: priority, created_at - # - # Index: state, run_at + # -> :paused -> :running + # -> :aborted + # -> :failed -> :running + # -> :aborted + # -> :aborted + # -> :aborted aasm column: :state do # Job has been created and is queued for processing ( Initial state ) state :queued, initial: true # Job is running @@ -160,14 +144,10 @@ state :paused # Job failed to process and needs to be manually re-tried or aborted state :failed - # Job failed to process previously and is scheduled to be retried at a - # future date - state :retry - # Job was aborted and cannot be resumed ( End state ) state :aborted event :start, before: :before_start do transitions from: :queued, to: :running @@ -251,10 +231,15 @@ Time.now - created_at end Time.at(seconds) end + # A job has expired if the expiry time has passed before it is started + def expired? + started_at.nil? && expires_at && (expires_at < Time.now) + end + # Returns [Hash] status of this job def status(time_zone='Eastern Time (US & Canada)') h = { state: state, description: description @@ -277,17 +262,19 @@ end h[:duration] = duration.strftime('%H:%M:%S') h end - # Same basic formula for calculating retry interval as delayed_job and Sidekiq - # TODO Consider lowering the priority automatically after every retry? + # TODO Jobs are not currently automatically retried. Is there a need? def seconds_to_delay(count) + # TODO Consider lowering the priority automatically after every retry? + # Same basic formula for calculating retry interval as delayed_job and Sidekiq (count ** 4) + 15 + (rand(30)*(count+1)) end # Patch the way MongoMapper reloads a model + # Only reload MongoMapper attributes, leaving other instance variables untouched def reload if doc = collection.find_one(:_id => id) load_from_database(doc) self else @@ -343,11 +330,11 @@ # Parameters # server_name [String] # Name of the server that will be processing this job # # skip_job_ids [Array<BSON::ObjectId>] - # Job ids to exclude when looking for 3the next job + # Job ids to exclude when looking for the next job # # Note: # If a job is in queued state it will be started def self.next_job(server_name, skip_job_ids = nil) query = { @@ -366,21 +353,28 @@ }, ] } query['_id'] = { '$nin' => skip_job_ids } if skip_job_ids && skip_job_ids.size > 0 - if doc = find_and_modify( + while doc = find_and_modify( query: query, sort: [['priority', 'asc'], ['created_at', 'asc']], update: { '$set' => { 'server_name' => server_name, 'state' => 'running' } } ) job = load(doc) - unless job.running? - # Also update in-memory state and run call-backs - job.start - job.set(started_at: job.started_at) + if job.running? + return job + else + if job.expired? + job.destroy + logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" + else + # Also update in-memory state and run call-backs + job.start + job.set(started_at: job.started_at) + return job + end end - job end end ############################################################################ private