lib/rocket_job/plugins/job/persistence.rb in rocketjob-2.0.0.rc1 vs lib/rocket_job/plugins/job/persistence.rb in rocketjob-2.0.0.rc2
- old
+ new
@@ -33,41 +33,40 @@
# Name of the worker that will be processing this job
#
# skip_job_ids [Array<BSON::ObjectId>]
# Job ids to exclude when looking for the next job
def self.rocket_job_retrieve(worker_name, skip_job_ids = nil)
- run_at = [
+ run_at = [
{run_at: {'$exists' => false}},
{run_at: {'$lte' => Time.now}}
]
- query =
- if defined?(RocketJobPro)
- {
- '$and' => [
- {
- '$or' => [
- {'state' => 'queued'}, # Jobs
- {'state' => 'running', 'sub_state' => :processing} # Slices
- ]
- },
- {
- '$or' => run_at
- }
- ]
- }
- else
- {
- 'state' => 'queued',
- '$or' => run_at
- }
- end
+ update = query = nil
+ if defined?(RocketJobPro)
+ query = {
+ '$and' => [
+ {
+ '$or' => [
+ {'state' => 'queued'}, # Jobs
+ {'state' => 'running', 'sub_state' => :processing} # Slices
+ ]
+ },
+ {
+ '$or' => run_at
+ }
+ ]
+ }
+ update = {'$set' => {'worker_name' => worker_name, 'state' => 'running'}}
+ else
+ query = {'state' => 'queued', '$or' => run_at}
+ update = {'$set' => {'worker_name' => worker_name, 'state' => 'running', 'started_at' => Time.now}}
+ end
query['_id'] = {'$nin' => skip_job_ids} if skip_job_ids && skip_job_ids.size > 0
if doc = find_and_modify(
query: query,
sort: {priority: 1, _id: 1},
- update: {'$set' => {'worker_name' => worker_name, 'state' => 'running'}}
+ update: update
)
load(doc)
end
end