lib/rocket_job/plugins/job/persistence.rb in rocketjob-2.1.3 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.0.0.alpha
- old
+ new
@@ -8,23 +8,12 @@
module Persistence
extend ActiveSupport::Concern
included do
# Store all job types in this collection
- set_collection_name 'rocket_job.jobs'
+ store_in collection: 'rocket_job.jobs'
- # Create indexes
- def self.create_indexes
- # Used by find_and_modify in .rocket_job_retrieve
- ensure_index({state: 1, priority: 1, _id: 1}, background: true)
- # Remove outdated indexes if present
- drop_index('state_1_run_at_1_priority_1_created_at_1_sub_state_1') rescue nil
- drop_index('state_1_priority_1_created_at_1_sub_state_1') rescue nil
- drop_index('state_1_priority_1_created_at_1') rescue nil
- drop_index('created_at_1') rescue nil
- end
-
# Retrieves the next job to work on in priority based order
# and assigns it to this worker
#
# Returns nil if no jobs are available for processing
#
@@ -33,44 +22,16 @@
# Name of the worker that will be processing this job
#
# skip_job_ids [Array<BSON::ObjectId>]
# Job ids to exclude when looking for the next job
def self.rocket_job_retrieve(worker_name, skip_job_ids = nil)
- run_at = [
- {run_at: {'$exists' => false}},
- {run_at: {'$lte' => Time.now}}
- ]
- update = query = nil
- if defined?(RocketJobPro)
- query = {
- '$and' => [
- {
- '$or' => [
- {'state' => 'queued'}, # Jobs
- {'state' => 'running', 'sub_state' => :processing} # Slices
- ]
- },
- {
- '$or' => run_at
- }
- ]
- }
- update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}}
- else
- query = {'state' => 'queued', '$or' => run_at}
- update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}}
- end
+ query = queued_now
+ update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}}
- query['_id'] = {'$nin' => skip_job_ids} if skip_job_ids && skip_job_ids.size > 0
+ query = query.where(:id.nin => skip_job_ids) if skip_job_ids && skip_job_ids.size > 0
- if doc = find_and_modify(
- query: query,
- sort: {priority: 1, _id: 1},
- update: update
- )
- load(doc)
- end
+ query.sort(priority: 1, _id: 1).find_one_and_update(update)
end
# Returns [Hash<String:Integer>] of the number of jobs in each state
# Queued jobs are separated into :queued_now and :scheduled
# :queued_now are jobs that are awaiting processing and can be processed now.
@@ -112,11 +73,11 @@
counts[result['_id'].to_sym] = result['count']
end
# Calculate :queued_now and :scheduled if there are queued jobs
if queued_count = counts[:queued]
- scheduled_count = RocketJob::Job.where(state: :queued, run_at: {'$gt' => Time.now}).count
+ scheduled_count = RocketJob::Job.scheduled.count
if scheduled_count > 0
queued_now_count = queued_count - scheduled_count
counts[:queued_now] = queued_count - scheduled_count if queued_now_count > 0
counts[:scheduled] = scheduled_count
else
@@ -131,34 +92,17 @@
# Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed
def reload
return super unless destroy_on_complete
begin
super
- rescue MongoMapper::DocumentNotFound
+ rescue Mongoid::Errors::DocumentNotFound
unless completed?
self.state = :completed
rocket_job_set_completed_at
rocket_job_mark_complete
end
self
end
- end
-
- private
-
- # After this model is loaded, convert any hashes in the arguments list to HashWithIndifferentAccess
- def load_from_database(*args)
- super
- if arguments.present?
- self.arguments = arguments.collect { |i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i }
- end
- end
-
- # Apply RocketJob defaults after initializing default values
- # but before setting attributes. after_initialize is too late
- def initialize_default_values(except = {})
- super
- rocket_job_set_defaults
end
end
end
end