lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.0.0.beta2 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.0.0.beta3
- old
+ new
@@ -10,28 +10,38 @@
included do
# Store all job types in this collection
store_in collection: 'rocket_job.jobs'
+ after_initialize :remove_arguments
+ end
+
+ module ClassMethods
# Retrieves the next job to work on in priority based order
# and assigns it to this worker
#
# Returns nil if no jobs are available for processing
#
# Parameters
- # worker_name [String]
+ # worker_name: [String]
# Name of the worker that will be processing this job
#
- # skip_job_ids [Array<BSON::ObjectId>]
- # Job ids to exclude when looking for the next job
- def self.rocket_job_retrieve(worker_name, skip_job_ids = nil)
- query = queued_now
- update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}}
-
- query = query.where(:id.nin => skip_job_ids) if skip_job_ids && skip_job_ids.size > 0
-
- query.sort(priority: 1, _id: 1).find_one_and_update(update)
+ # filter: [Hash]
+ # Filter to apply to the query.
+ # For example: to exclude jobs from being returned.
+ #
+ # Example:
+ # # Skip any job ids from the job_ids_list
+ # filter = {:id.nin => job_ids_list}
+ # job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter)
+ def rocket_job_retrieve(worker_name, filter)
+ SemanticLogger.silence(:info) do
+ query = queued_now
+ query = query.where(filter) unless filter.blank?
+ update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}}
+ query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
+ end
end
# Returns [Hash<String:Integer>] of the number of jobs in each state
# Queued jobs are separated into :queued_now and :scheduled
# :queued_now are jobs that are awaiting processing and can be processed now.
@@ -57,11 +67,11 @@
# # => {
# :failed => 1,
# :running => 25,
# :completed => 1237
# }
- def self.counts_by_state
+ def counts_by_state
counts = {}
collection.aggregate([
{
'$group' => {
_id: '$state',
@@ -84,11 +94,10 @@
counts[:queued_now] = queued_count
end
end
counts
end
-
end
# Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed
def reload
return super unless destroy_on_complete
@@ -100,9 +109,16 @@
rocket_job_set_completed_at
rocket_job_mark_complete
end
self
end
+ end
+
+ private
+
+ # Remove old style arguments that were stored as an array
+ def remove_arguments
+ attributes.delete('arguments') unless respond_to?('arguments='.to_sym)
end
end
end
end