lib/rocket_job/plugins/job/persistence.rb in rocketjob-2.1.3 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.0.0.alpha

- old
+ new

@@ -8,23 +8,12 @@ module Persistence extend ActiveSupport::Concern included do # Store all job types in this collection - set_collection_name 'rocket_job.jobs' + store_in collection: 'rocket_job.jobs' - # Create indexes - def self.create_indexes - # Used by find_and_modify in .rocket_job_retrieve - ensure_index({state: 1, priority: 1, _id: 1}, background: true) - # Remove outdated indexes if present - drop_index('state_1_run_at_1_priority_1_created_at_1_sub_state_1') rescue nil - drop_index('state_1_priority_1_created_at_1_sub_state_1') rescue nil - drop_index('state_1_priority_1_created_at_1') rescue nil - drop_index('created_at_1') rescue nil - end - # Retrieves the next job to work on in priority based order # and assigns it to this worker # # Returns nil if no jobs are available for processing # @@ -33,44 +22,16 @@ # Name of the worker that will be processing this job # # skip_job_ids [Array<BSON::ObjectId>] # Job ids to exclude when looking for the next job def self.rocket_job_retrieve(worker_name, skip_job_ids = nil) - run_at = [ - {run_at: {'$exists' => false}}, - {run_at: {'$lte' => Time.now}} - ] - update = query = nil - if defined?(RocketJobPro) - query = { - '$and' => [ - { - '$or' => [ - {'state' => 'queued'}, # Jobs - {'state' => 'running', 'sub_state' => :processing} # Slices - ] - }, - { - '$or' => run_at - } - ] - } - update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}} - else - query = {'state' => 'queued', '$or' => run_at} - update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} - end + query = queued_now + update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} - query['_id'] = {'$nin' => skip_job_ids} if skip_job_ids && skip_job_ids.size > 0 + query = query.where(:id.nin => skip_job_ids) if skip_job_ids && skip_job_ids.size > 0 - if doc = find_and_modify( - query: query, - sort: {priority: 1, _id: 1}, - update: update - ) - load(doc) - end + query.sort(priority: 1, _id: 1).find_one_and_update(update) end # Returns [Hash<String:Integer>] of the number of jobs in each state # Queued jobs are separated into :queued_now and :scheduled # :queued_now are jobs that are awaiting processing and can be processed now. @@ -112,11 +73,11 @@ counts[result['_id'].to_sym] = result['count'] end # Calculate :queued_now and :scheduled if there are queued jobs if queued_count = counts[:queued] - scheduled_count = RocketJob::Job.where(state: :queued, run_at: {'$gt' => Time.now}).count + scheduled_count = RocketJob::Job.scheduled.count if scheduled_count > 0 queued_now_count = queued_count - scheduled_count counts[:queued_now] = queued_count - scheduled_count if queued_now_count > 0 counts[:scheduled] = scheduled_count else @@ -131,34 +92,17 @@ # Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed def reload return super unless destroy_on_complete begin super - rescue MongoMapper::DocumentNotFound + rescue Mongoid::Errors::DocumentNotFound unless completed? self.state = :completed rocket_job_set_completed_at rocket_job_mark_complete end self end - end - - private - - # After this model is loaded, convert any hashes in the arguments list to HashWithIndifferentAccess - def load_from_database(*args) - super - if arguments.present? - self.arguments = arguments.collect { |i| i.is_a?(BSON::OrderedHash) ? i.with_indifferent_access : i } - end - end - - # Apply RocketJob defaults after initializing default values - # but before setting attributes. after_initialize is too late - def initialize_default_values(except = {}) - super - rocket_job_set_defaults end end end end