lib/rocket_job/sliced/input.rb in rocketjob-4.2.0 vs lib/rocket_job/sliced/input.rb in rocketjob-4.3.0.beta

- old
+ new

@@ -1,17 +1,15 @@ module RocketJob module Sliced class Input < Slices - def upload(file_name_or_io = nil, encoding: 'UTF-8', stream_mode: :line, on_first: nil, **args, &block) - raise(ArgumentError, 'Either file_name_or_io, or a block must be supplied') unless file_name_or_io || block - - block ||= -> (io) do - iterator = "each_#{stream_mode}".to_sym - IOStreams.public_send(iterator, file_name_or_io, encoding: encoding, **args) { |line| io << line } - end - + def upload(on_first: nil, &block) + # Create indexes before uploading + create_indexes Writer::Input.collect(self, on_first: on_first, &block) + rescue StandardError => exc + drop + raise(exc) end def upload_mongo_query(criteria, *column_names, &block) options = criteria.options @@ -34,22 +32,21 @@ else ->(document) { column_names.collect { |c| document[c] } } end end - Writer::Input.collect(self) do |records| + upload do |records| # Drop down to the mongo driver level to avoid constructing a Model for each document returned criteria.klass.collection.find(criteria.selector, options).each do |document| records << block.call(document) end end end def upload_arel(arel, *column_names, &block) unless block - column_names = column_names.collect(&:to_sym) - column_names << :id if column_names.size.zero? + column_names = column_names.empty? ? [:id] : column_names.collect(&:to_sym) block = if column_names.size == 1 column = column_names.first ->(model) { model.send(column) } @@ -59,29 +56,32 @@ # find_each requires the :id column in the query selection = column_names.include?(:id) ? column_names : column_names + [:id] arel = arel.select(selection) end - Writer::Input.collect(self) do |records| - arel.find_each { |model| records << block.call(model) } - end + upload { |records| arel.find_each { |model| records << block.call(model) } } end def upload_integer_range(start_id, last_id) + # Create indexes before uploading create_indexes count = 0 while start_id <= last_id end_id = start_id + slice_size - 1 end_id = last_id if end_id > last_id create!(records: [[start_id, end_id]]) start_id += slice_size count += 1 end count + rescue StandardError => exc + drop + raise(exc) end def upload_integer_range_in_reverse_order(start_id, last_id) + # Create indexes before uploading create_indexes end_id = last_id count = 0 while end_id >= start_id first_id = end_id - slice_size + 1 @@ -89,9 +89,12 @@ create!(records: [[first_id, end_id]]) end_id -= slice_size count += 1 end count + rescue StandardError => exc + drop + raise(exc) end # Iterate over each failed record, if any # Since each slice can only contain 1 failed record, only the failed # record is returned along with the slice containing the exception