lib/rocket_job/sliced/input.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/sliced/input.rb in rocketjob-6.0.0
- old
+ new
@@ -1,101 +1,89 @@
module RocketJob
module Sliced
class Input < Slices
- def upload(on_first: nil, &block)
+ def upload(**args, &block)
# Create indexes before uploading
create_indexes
- Writer::Input.collect(self, on_first: on_first, &block)
+ Writer::Input.collect(self, **args, &block)
rescue Exception => e
drop
raise(e)
end
- def upload_mongo_query(criteria, *column_names, &block)
+ def upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block)
options = criteria.options
# Without a block extract the fields from the supplied criteria
if block
# Criteria is returning old school :fields instead of :projections
options[:projection] = options.delete(:fields) if options.key?(:fields)
else
- column_names = column_names.collect(&:to_s)
- column_names << "_id" if column_names.size.zero?
-
- fields = options.delete(:fields) || {}
- column_names.each { |col| fields[col] = 1 }
+ columns = columns.blank? ? ["_id"] : columns.collect(&:to_s)
+ fields = options.delete(:fields) || {}
+ columns.each { |col| fields[col] = 1 }
options[:projection] = fields
block =
- if column_names.size == 1
- column = column_names.first
+ if columns.size == 1
+ column = columns.first
->(document) { document[column] }
else
- ->(document) { column_names.collect { |c| document[c] } }
+ ->(document) { columns.collect { |c| document[c] } }
end
end
- upload do |records|
+ upload(slice_batch_size: slice_batch_size) do |records|
# Drop down to the mongo driver level to avoid constructing a Model for each document returned
criteria.klass.collection.find(criteria.selector, options).each do |document|
records << block.call(document)
end
end
end
- def upload_arel(arel, *column_names, &block)
+ def upload_arel(arel, columns: nil, slice_batch_size: nil, &block)
unless block
- column_names = column_names.empty? ? [:id] : column_names.collect(&:to_sym)
+ columns = columns.blank? ? [:id] : columns.collect(&:to_sym)
block =
- if column_names.size == 1
- column = column_names.first
- ->(model) { model.send(column) }
+ if columns.size == 1
+ column = columns.first
+ ->(model) { model.public_send(column) }
else
- ->(model) { column_names.collect { |c| model.send(c) } }
+ ->(model) { columns.collect { |c| model.public_send(c) } }
end
# find_each requires the :id column in the query
- selection = column_names.include?(:id) ? column_names : column_names + [:id]
+ selection = columns.include?(:id) ? columns : columns + [:id]
arel = arel.select(selection)
end
- upload { |records| arel.find_each { |model| records << block.call(model) } }
+ upload(slice_batch_size: slice_batch_size) { |records| arel.find_each { |model| records << block.call(model) } }
end
- def upload_integer_range(start_id, last_id)
- # Create indexes before uploading
- create_indexes
- count = 0
- while start_id <= last_id
- end_id = start_id + slice_size - 1
- end_id = last_id if end_id > last_id
- create!(records: [[start_id, end_id]])
- start_id += slice_size
- count += 1
+ def upload_integer_range(start_id, last_id, slice_batch_size: 1_000)
+ # Each "record" is actually a range of Integers which makes up each slice
+ upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records|
+ while start_id <= last_id
+ end_id = start_id + slice_size - 1
+ end_id = last_id if end_id > last_id
+ records << [start_id, end_id]
+ start_id += slice_size
+ end
end
- count
- rescue Exception => e
- drop
- raise(e)
end
- def upload_integer_range_in_reverse_order(start_id, last_id)
- # Create indexes before uploading
- create_indexes
- end_id = last_id
- count = 0
- while end_id >= start_id
- first_id = end_id - slice_size + 1
- first_id = start_id if first_id.negative? || (first_id < start_id)
- create!(records: [[first_id, end_id]])
- end_id -= slice_size
- count += 1
+ def upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000)
+ # Each "record" is actually a range of Integers which makes up each slice
+ upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records|
+ end_id = last_id
+ while end_id >= start_id
+ first_id = end_id - slice_size + 1
+ first_id = start_id if first_id.negative? || (first_id < start_id)
+ records << [first_id, end_id]
+ end_id -= slice_size
+ end
end
- count
- rescue Exception => e
- drop
- raise(e)
end
# Iterate over each failed record, if any
# Since each slice can only contain 1 failed record, only the failed
# record is returned along with the slice containing the exception
@@ -135,14 +123,14 @@
# If a slice is in queued state it will be started and assigned to this worker
def next_slice(worker_name)
# TODO: Will it perform faster without the id sort?
# I.e. Just process on a FIFO basis?
document = all.queued.
- sort("_id" => 1).
- find_one_and_update(
- {"$set" => {worker_name: worker_name, state: "running", started_at: Time.now}},
- return_document: :after
- )
+ sort("_id" => 1).
+ find_one_and_update(
+ {"$set" => {worker_name: worker_name, state: "running", started_at: Time.now}},
+ return_document: :after
+ )
document.collection_name = collection_name if document
document
end
end
end