lib/rocket_job/plugins/job/persistence.rb in rocketjob-3.5.2 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-4.0.0

- old
+ new

@@ -30,13 +30,15 @@ # # Skip any job ids from the job_ids_list # filter = {:id.nin => job_ids_list} # job = RocketJob::Job.rocket_job_retrieve('host:pid:worker', filter) def rocket_job_retrieve(worker_name, filter) SemanticLogger.silence(:info) do - query = queued_now - query = query.where(filter) unless filter.blank? - update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} + scheduled = {'$or' => [{run_at: nil}, {:run_at.lte => Time.now}]} + working = {'$or' => [{state: :queued}, {state: :running, sub_state: :processing}]} + query = self.and(working, scheduled) + query = query.where(filter) unless filter.blank? + update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}} query.sort(priority: 1, _id: 1).find_one_and_update(update, bypass_document_validation: true) end end # Returns [Hash<String:Integer>] of the number of jobs in each state @@ -99,10 +101,10 @@ # Set in-memory job to complete if `destroy_on_complete` and the job has been destroyed def reload return super unless destroy_on_complete begin super - rescue Mongoid::Errors::DocumentNotFound + rescue ::Mongoid::Errors::DocumentNotFound unless completed? self.state = :completed rocket_job_set_completed_at rocket_job_mark_complete end