lib/rocket_job/plugins/job/persistence.rb in rocketjob-5.1.1 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-5.2.0.beta1

- old
+ new

@@ -1,48 +1,20 @@ -require 'active_support/concern' +require "active_support/concern" module RocketJob module Plugins module Job # Prevent more than one instance of this job class from running at a time module Persistence extend ActiveSupport::Concern included do # Store all job types in this collection - store_in collection: 'rocket_job.jobs' + store_in collection: "rocket_job.jobs" end module ClassMethods - # Retrieves the next job to work on in priority based order - # and assigns it to this worker - # - # Returns nil if no jobs are available for processing - # - # Parameters - # worker_name: [String] - # Name of the worker that will be processing this job - # - # filter: [Hash] - # Filter to apply to the query. - # For example: to exclude jobs from being returned. - # - # Example: - # # Skip any job ids from the job_ids_list - # filter = {:id.nin => job_ids_list} - # job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter) - def rocket_job_retrieve(worker_name, filter) - SemanticLogger.silence(:info) do - scheduled = {'$or' => [{run_at: nil}, {:run_at.lte => Time.now}]} - working = {'$or' => [{state: :queued}, {state: :running, sub_state: :processing}]} - query = self.and(working, scheduled) - query = query.where(filter) unless filter.blank? - update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}} - query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) - end - end - # Returns [Hash<String:Integer>] of the number of jobs in each state # Queued jobs are separated into :queued_now and :scheduled # :queued_now are jobs that are awaiting processing and can be processed now. # :scheduled are jobs scheduled to run the future. # @@ -71,18 +43,18 @@ def counts_by_state counts = {} collection.aggregate( [ { - '$group' => { - _id: '$state', - count: {'$sum' => 1} + "$group" => { + _id: "$state", + count: {"$sum" => 1} } } ] ).each do |result| - counts[result['_id'].to_sym] = result['count'] + counts[result["_id"].to_sym] = result["count"] end # Calculate :queued_now and :scheduled if there are queued jobs if (queued_count = counts[:queued]) scheduled_count = RocketJob::Job.scheduled.count @@ -99,9 +71,10 @@ end # Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed def reload return super unless destroy_on_complete + begin super rescue ::Mongoid::Errors::DocumentNotFound unless completed? self.state = :completed