lib/rocket_job/sliced/input.rb in rocketjob-4.2.0 vs lib/rocket_job/sliced/input.rb in rocketjob-4.3.0.beta
- old
+ new
@@ -1,17 +1,15 @@
module RocketJob
module Sliced
class Input < Slices
- def upload(file_name_or_io = nil, encoding: 'UTF-8', stream_mode: :line, on_first: nil, **args, &block)
- raise(ArgumentError, 'Either file_name_or_io, or a block must be supplied') unless file_name_or_io || block
-
- block ||= -> (io) do
- iterator = "each_#{stream_mode}".to_sym
- IOStreams.public_send(iterator, file_name_or_io, encoding: encoding, **args) { |line| io << line }
- end
-
+ def upload(on_first: nil, &block)
+ # Create indexes before uploading
+ create_indexes
Writer::Input.collect(self, on_first: on_first, &block)
+ rescue StandardError => exc
+ drop
+ raise(exc)
end
def upload_mongo_query(criteria, *column_names, &block)
options = criteria.options
@@ -34,22 +32,21 @@
else
->(document) { column_names.collect { |c| document[c] } }
end
end
- Writer::Input.collect(self) do |records|
+ upload do |records|
# Drop down to the mongo driver level to avoid constructing a Model for each document returned
criteria.klass.collection.find(criteria.selector, options).each do |document|
records << block.call(document)
end
end
end
def upload_arel(arel, *column_names, &block)
unless block
- column_names = column_names.collect(&:to_sym)
- column_names << :id if column_names.size.zero?
+ column_names = column_names.empty? ? [:id] : column_names.collect(&:to_sym)
block =
if column_names.size == 1
column = column_names.first
->(model) { model.send(column) }
@@ -59,29 +56,32 @@
# find_each requires the :id column in the query
selection = column_names.include?(:id) ? column_names : column_names + [:id]
arel = arel.select(selection)
end
- Writer::Input.collect(self) do |records|
- arel.find_each { |model| records << block.call(model) }
- end
+ upload { |records| arel.find_each { |model| records << block.call(model) } }
end
def upload_integer_range(start_id, last_id)
+ # Create indexes before uploading
create_indexes
count = 0
while start_id <= last_id
end_id = start_id + slice_size - 1
end_id = last_id if end_id > last_id
create!(records: [[start_id, end_id]])
start_id += slice_size
count += 1
end
count
+ rescue StandardError => exc
+ drop
+ raise(exc)
end
def upload_integer_range_in_reverse_order(start_id, last_id)
+ # Create indexes before uploading
create_indexes
end_id = last_id
count = 0
while end_id >= start_id
first_id = end_id - slice_size + 1
@@ -89,9 +89,12 @@
create!(records: [[first_id, end_id]])
end_id -= slice_size
count += 1
end
count
+ rescue StandardError => exc
+ drop
+ raise(exc)
end
# Iterate over each failed record, if any
# Since each slice can only contain 1 failed record, only the failed
# record is returned along with the slice containing the exception