lib/rocket_job/plugins/job/persistence.rb in rocketjob-5.1.1 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,48 +1,20 @@
-require 'active_support/concern'
+require "active_support/concern"
module RocketJob
module Plugins
module Job
# Prevent more than one instance of this job class from running at a time
module Persistence
extend ActiveSupport::Concern
included do
# Store all job types in this collection
- store_in collection: 'rocket_job.jobs'
+ store_in collection: "rocket_job.jobs"
end
module ClassMethods
- # Retrieves the next job to work on in priority based order
- # and assigns it to this worker
- #
- # Returns nil if no jobs are available for processing
- #
- # Parameters
- # worker_name: [String]
- # Name of the worker that will be processing this job
- #
- # filter: [Hash]
- # Filter to apply to the query.
- # For example: to exclude jobs from being returned.
- #
- # Example:
- # # Skip any job ids from the job_ids_list
- # filter = {:id.nin => job_ids_list}
- # job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter)
- def rocket_job_retrieve(worker_name, filter)
- SemanticLogger.silence(:info) do
- scheduled = {'$or' => [{run_at: nil}, {:run_at.lte => Time.now}]}
- working = {'$or' => [{state: :queued}, {state: :running, sub_state: :processing}]}
- query = self.and(working, scheduled)
- query = query.where(filter) unless filter.blank?
- update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}}
- query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
- end
- end
-
# Returns [Hash<String:Integer>] of the number of jobs in each state
# Queued jobs are separated into :queued_now and :scheduled
# :queued_now are jobs that are awaiting processing and can be processed now.
# :scheduled are jobs scheduled to run the future.
#
@@ -71,18 +43,18 @@
def counts_by_state
counts = {}
collection.aggregate(
[
{
- '$group' => {
- _id: '$state',
- count: {'$sum' => 1}
+ "$group" => {
+ _id: "$state",
+ count: {"$sum" => 1}
}
}
]
).each do |result|
- counts[result['_id'].to_sym] = result['count']
+ counts[result["_id"].to_sym] = result["count"]
end
# Calculate :queued_now and :scheduled if there are queued jobs
if (queued_count = counts[:queued])
scheduled_count = RocketJob::Job.scheduled.count
@@ -99,9 +71,10 @@
end
# Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed
def reload
return super unless destroy_on_complete
+
begin
super
rescue ::Mongoid::Errors::DocumentNotFound
unless completed?
self.state = :completed