lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.5.2 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-4.0.0
- old
+ new
@@ -30,13 +30,15 @@
# # Skip any job ids from the job_ids_list
# filter = {:id.nin => job_ids_list}
# job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter)
def rocket_job_retrieve(worker_name, filter)
SemanticLogger.silence(:info) do
- query = queued_now
- query = query.where(filter) unless filter.blank?
- update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}}
+ scheduled = {'$or' => [{run_at: nil}, {:run_at.lte => Time.now}]}
+ working = {'$or' => [{state: :queued}, {state: :running, sub_state: :processing}]}
+ query = self.and(working, scheduled)
+ query = query.where(filter) unless filter.blank?
+ update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}}
query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true)
end
end
# Returns [Hash<String:Integer>] of the number of jobs in each state
@@ -99,10 +101,10 @@
# Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed
def reload
return super unless destroy_on_complete
begin
super
- rescue Mongoid::Errors::DocumentNotFound
+ rescue ::Mongoid::Errors::DocumentNotFound
unless completed?
self.state = :completed
rocket_job_set_completed_at
rocket_job_mark_complete
end