lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.0.0.beta2 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.0.0.beta3

- old
+ new

@@ -10,28 +10,38 @@ included do # Store all job types in this collection store_in collection: 'rocket_job.jobs' + after_initialize :remove_arguments + end + + module ClassMethods # Retrieves the next job to work on in priority based order # and assigns it to this worker # # Returns nil if no jobs are available for processing # # Parameters - # worker_name [String] + # worker_name: [String] # Name of the worker that will be processing this job # - # skip_job_ids [Array<BSON::ObjectId>] - # Job ids to exclude when looking for the next job - def self.rocket_job_retrieve(worker_name, skip_job_ids = nil) - query = queued_now - update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} - - query = query.where(:id.nin => skip_job_ids) if skip_job_ids && skip_job_ids.size > 0 - - query.sort(priority: 1, _id: 1).find_one_and_update(update) + # filter: [Hash] + # Filter to apply to the query. + # For example: to exclude jobs from being returned. + # + # Example: + # # Skip any job ids from the job_ids_list + # filter = {:id.nin => job_ids_list} + # job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter) + def rocket_job_retrieve(worker_name, filter) + SemanticLogger.silence(:info) do + query = queued_now + query = query.where(filter) unless filter.blank? + update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} + query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) + end end # Returns [Hash<String:Integer>] of the number of jobs in each state # Queued jobs are separated into :queued_now and :scheduled # :queued_now are jobs that are awaiting processing and can be processed now. @@ -57,11 +67,11 @@ # # => { # :failed => 1, # :running => 25, # :completed => 1237 # } - def self.counts_by_state + def counts_by_state counts = {} collection.aggregate([ { '$group' => { _id: '$state', @@ -84,11 +94,10 @@ counts[:queued_now] = queued_count end end counts end - end # Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed def reload return super unless destroy_on_complete @@ -100,9 +109,16 @@ rocket_job_set_completed_at rocket_job_mark_complete end self end + end + + private + + # Remove old style arguments that were stored as an array + def remove_arguments + attributes.delete('arguments') unless respond_to?('arguments='.to_sym) end end end end