lib/rocket_job/sliced/input.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/sliced/input.rb in rocketjob-6.0.0

- old
+ new

@@ -1,101 +1,89 @@ module RocketJob module Sliced class Input < Slices - def upload(on_first: nil, &block) + def upload(**args, &block) # Create indexes before uploading create_indexes - Writer::Input.collect(self, on_first: on_first, &block) + Writer::Input.collect(self, **args, &block) rescue Exception => e drop raise(e) end - def upload_mongo_query(criteria, *column_names, &block) + def upload_mongo_query(criteria, columns: [], slice_batch_size: nil, &block) options = criteria.options # Without a block extract the fields from the supplied criteria if block # Criteria is returning old school :fields instead of :projections options[:projection] = options.delete(:fields) if options.key?(:fields) else - column_names = column_names.collect(&:to_s) - column_names << "_id" if column_names.size.zero? - - fields = options.delete(:fields) || {} - column_names.each { |col| fields[col] = 1 } + columns = columns.blank? ? ["_id"] : columns.collect(&:to_s) + fields = options.delete(:fields) || {} + columns.each { |col| fields[col] = 1 } options[:projection] = fields block = - if column_names.size == 1 - column = column_names.first + if columns.size == 1 + column = columns.first ->(document) { document[column] } else - ->(document) { column_names.collect { |c| document[c] } } + ->(document) { columns.collect { |c| document[c] } } end end - upload do |records| + upload(slice_batch_size: slice_batch_size) do |records| # Drop down to the mongo driver level to avoid constructing a Model for each document returned criteria.klass.collection.find(criteria.selector, options).each do |document| records << block.call(document) end end end - def upload_arel(arel, *column_names, &block) + def upload_arel(arel, columns: nil, slice_batch_size: nil, &block) unless block - column_names = column_names.empty? ? [:id] : column_names.collect(&:to_sym) + columns = columns.blank? ? [:id] : columns.collect(&:to_sym) block = - if column_names.size == 1 - column = column_names.first - ->(model) { model.send(column) } + if columns.size == 1 + column = columns.first + ->(model) { model.public_send(column) } else - ->(model) { column_names.collect { |c| model.send(c) } } + ->(model) { columns.collect { |c| model.public_send(c) } } end # find_each requires the :id column in the query - selection = column_names.include?(:id) ? column_names : column_names + [:id] + selection = columns.include?(:id) ? columns : columns + [:id] arel = arel.select(selection) end - upload { |records| arel.find_each { |model| records << block.call(model) } } + upload(slice_batch_size: slice_batch_size) { |records| arel.find_each { |model| records << block.call(model) } } end - def upload_integer_range(start_id, last_id) - # Create indexes before uploading - create_indexes - count = 0 - while start_id <= last_id - end_id = start_id + slice_size - 1 - end_id = last_id if end_id > last_id - create!(records: [[start_id, end_id]]) - start_id += slice_size - count += 1 + def upload_integer_range(start_id, last_id, slice_batch_size: 1_000) + # Each "record" is actually a range of Integers which makes up each slice + upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records| + while start_id <= last_id + end_id = start_id + slice_size - 1 + end_id = last_id if end_id > last_id + records << [start_id, end_id] + start_id += slice_size + end end - count - rescue Exception => e - drop - raise(e) end - def upload_integer_range_in_reverse_order(start_id, last_id) - # Create indexes before uploading - create_indexes - end_id = last_id - count = 0 - while end_id >= start_id - first_id = end_id - slice_size + 1 - first_id = start_id if first_id.negative? || (first_id < start_id) - create!(records: [[first_id, end_id]]) - end_id -= slice_size - count += 1 + def upload_integer_range_in_reverse_order(start_id, last_id, slice_batch_size: 1_000) + # Each "record" is actually a range of Integers which makes up each slice + upload(slice_size: 1, slice_batch_size: slice_batch_size) do |records| + end_id = last_id + while end_id >= start_id + first_id = end_id - slice_size + 1 + first_id = start_id if first_id.negative? || (first_id < start_id) + records << [first_id, end_id] + end_id -= slice_size + end end - count - rescue Exception => e - drop - raise(e) end # Iterate over each failed record, if any # Since each slice can only contain 1 failed record, only the failed # record is returned along with the slice containing the exception @@ -135,14 +123,14 @@ # If a slice is in queued state it will be started and assigned to this worker def next_slice(worker_name) # TODO: Will it perform faster without the id sort? # I.e. Just process on a FIFO basis? document = all.queued. - sort("_id" => 1). - find_one_and_update( - {"$set" => {worker_name: worker_name, state: "running", started_at: Time.now}}, - return_document: :after - ) + sort("_id" => 1). + find_one_and_update( + {"$set" => {worker_name: worker_name, state: "running", started_at: Time.now}}, + return_document: :after + ) document.collection_name = collection_name if document document end end end