lib/rocket_job/plugins/job/persistence.rb in rocketjob-2.0.0.rc1 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-2.0.0.rc2

- old
+ new

@@ -33,41 +33,40 @@ # Name of the worker that will be processing this job # # skip_job_ids [Array<BSON::ObjectId>] # Job ids to exclude when looking for the next job def self.rocket_job_retrieve(worker_name, skip_job_ids = nil) - run_at = [ + run_at = [ {run_at: {'$exists' => false}}, {run_at: {'$lte' => Time.now}} ] - query = - if defined?(RocketJobPro) - { - '$and' => [ - { - '$or' => [ - {'state' => 'queued'}, # Jobs - {'state' => 'running', 'sub_state' => :processing} # Slices - ] - }, - { - '$or' => run_at - } - ] - } - else - { - 'state' => 'queued', - '$or' => run_at - } - end + update = query = nil + if defined?(RocketJobPro) + query = { + '$and' => [ + { + '$or' => [ + {'state' => 'queued'}, # Jobs + {'state' => 'running', 'sub_state' => :processing} # Slices + ] + }, + { + '$or' => run_at + } + ] + } + update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}} + else + query = {'state' => 'queued', '$or' => run_at} + update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}} + end query['_id'] = {'$nin' => skip_job_ids} if skip_job_ids && skip_job_ids.size > 0 if doc = find_and_modify( query: query, sort: {priority: 1, _id: 1}, - update: {'$set' => {'worker_name' => worker_name, 'state' => 'running'}} + update: update ) load(doc) end end