lib/rocket_job/sliced/writer/input.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/sliced/writer/input.rb in rocketjob-6.0.0
- old
+ new
@@ -10,46 +10,74 @@
# Parameters
# on_first: [Proc]
# Block to call on the first line only, instead of storing in the slice.
# Useful for extracting the header row
# Default: nil
- def self.collect(input, **args)
- writer = new(input, **args)
+ #
+ # slice_size: [Integer]
+ # Override the slice size when uploading for example ranges, where slice is the size
+ # of the range itself.
+ #
+ # slice_batch_size: [Integer]
+ # The number of slices to batch up and to bulk load.
+ # For smaller slices this significantly improves upload performance.
+ # Note: If `slice_batch_size` is too high, it can exceed the maximum BSON block size.
+ def self.collect(data_store, **args)
+ writer = new(data_store, **args)
yield(writer)
writer.record_count
ensure
- writer&.close
+ writer&.flush
end
- def initialize(input, on_first: nil)
- @on_first = on_first
- @batch_count = 0
- @record_count = 0
- @input = input
- @record_number = 1
- @slice = @input.new(first_record_number: @record_number)
+ def initialize(data_store, on_first: nil, slice_size: nil, slice_batch_size: nil)
+ @on_first = on_first
+ @record_count = 0
+ @data_store = data_store
+ @slice_size = slice_size || @data_store.slice_size
+ @slice_batch_size = slice_batch_size || 20
+ @batch = []
+ @batch_count = 0
+ new_slice
end
def <<(line)
- @record_number += 1
if @on_first
@on_first.call(line)
@on_first = nil
return self
end
@slice << line
- @batch_count += 1
@record_count += 1
- if @batch_count >= @input.slice_size
- @input.insert(@slice)
- @batch_count = 0
- @slice = @input.new(first_record_number: @record_number)
+ if @slice.size >= @slice_size
+ save_slice
+ new_slice
end
self
end
- def close
- @input.insert(@slice) if @slice.size.positive?
+ def flush
+ if @slice_batch_size
+ @batch << @slice if @slice.size.positive?
+ @data_store.insert_many(@batch)
+ @batch = []
+ @batch_count = 0
+ elsif @slice.size.positive?
+ @data_store.insert(@slice)
+ end
+ end
+
+ def new_slice
+ @slice = @data_store.new(first_record_number: @record_count + 1)
+ end
+
+ def save_slice
+ return flush unless @slice_batch_size
+
+ @batch_count += 1
+ return flush if @batch_count >= @slice_batch_size
+
+ @batch << @slice
end
end
end
end
end