lib/rocket_job/sliced/input.rb in rocketjob-5.1.1 vs lib/rocket_job/sliced/input.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -3,13 +3,13 @@
class Input < Slices
def upload(on_first: nil, &block)
# Create indexes before uploading
create_indexes
Writer::Input.collect(self, on_first: on_first, &block)
- rescue StandardError => exc
+ rescue StandardError => e
drop
- raise(exc)
+ raise(e)
end
def upload_mongo_query(criteria, *column_names, &block)
options = criteria.options
@@ -17,11 +17,11 @@
if block
# Criteria is returning old school :fields instead of :projections
options[:projection] = options.delete(:fields) if options.key?(:fields)
else
column_names = column_names.collect(&:to_s)
- column_names << '_id' if column_names.size.zero?
+ column_names << "_id" if column_names.size.zero?
fields = options.delete(:fields) || {}
column_names.each { |col| fields[col] = 1 }
options[:projection] = fields
@@ -71,13 +71,13 @@
create!(records: [[start_id, end_id]])
start_id += slice_size
count += 1
end
count
- rescue StandardError => exc
+ rescue StandardError => e
drop
- raise(exc)
+ raise(e)
end
def upload_integer_range_in_reverse_order(start_id, last_id)
# Create indexes before uploading
create_indexes
@@ -89,13 +89,13 @@
create!(records: [[first_id, end_id]])
end_id -= slice_size
count += 1
end
count
- rescue StandardError => exc
+ rescue StandardError => e
drop
- raise(exc)
+ raise(e)
end
# Iterate over each failed record, if any
# Since each slice can only contain 1 failed record, only the failed
# record is returned along with the slice containing the exception
@@ -114,20 +114,20 @@
end
# Requeue all failed slices
def requeue_failed
failed.update_all(
- '$unset' => {worker_name: nil, started_at: nil},
- '$set' => {state: :queued}
+ "$unset" => {worker_name: nil, started_at: nil},
+ "$set" => {state: :queued}
)
end
# Requeue all running slices for a server or worker that is no longer available
def requeue_running(worker_name)
running.where(worker_name: /\A#{worker_name}/).update_all(
- '$unset' => {worker_name: nil, started_at: nil},
- '$set' => {state: :queued}
+ "$unset" => {worker_name: nil, started_at: nil},
+ "$set" => {state: :queued}
)
end
# Returns the next slice to work on in id order
# Returns nil if there are currently no queued slices
@@ -135,14 +135,14 @@
# If a slice is in queued state it will be started and assigned to this worker
def next_slice(worker_name)
# TODO: Will it perform faster without the id sort?
# I.e. Just process on a FIFO basis?
document = all.queued.
- sort('_id' => 1).
- find_one_and_update(
- {'$set' => {worker_name: worker_name, state: :running, started_at: Time.now}},
- return_document: :after
- )
+ sort("_id" => 1).
+ find_one_and_update(
+ {"$set" => {worker_name: worker_name, state: :running, started_at: Time.now}},
+ return_document: :after
+ )
document.collection_name = collection_name if document
document
end
end
end