lib/rocket_job/sliced/writer/input.rb in rocketjob-6.0.0.rc3 vs lib/rocket_job/sliced/writer/input.rb in rocketjob-6.0.0

- old
+ new

@@ -10,46 +10,74 @@ # Parameters # on_first: [Proc] # Block to call on the first line only, instead of storing in the slice. # Useful for extracting the header row # Default: nil - def self.collect(input, **args) - writer = new(input, **args) + # + # slice_size: [Integer] + # Override the slice size when uploading for example ranges, where slice is the size + # of the range itself. + # + # slice_batch_size: [Integer] + # The number of slices to batch up and to bulk load. + # For smaller slices this significantly improves upload performance. + # Note: If `slice_batch_size` is too high, it can exceed the maximum BSON block size. + def self.collect(data_store, **args) + writer = new(data_store, **args) yield(writer) writer.record_count ensure - writer&.close + writer&.flush end - def initialize(input, on_first: nil) - @on_first = on_first - @batch_count = 0 - @record_count = 0 - @input = input - @record_number = 1 - @slice = @input.new(first_record_number: @record_number) + def initialize(data_store, on_first: nil, slice_size: nil, slice_batch_size: nil) + @on_first = on_first + @record_count = 0 + @data_store = data_store + @slice_size = slice_size || @data_store.slice_size + @slice_batch_size = slice_batch_size || 20 + @batch = [] + @batch_count = 0 + new_slice end def <<(line) - @record_number += 1 if @on_first @on_first.call(line) @on_first = nil return self end @slice << line - @batch_count += 1 @record_count += 1 - if @batch_count >= @input.slice_size - @input.insert(@slice) - @batch_count = 0 - @slice = @input.new(first_record_number: @record_number) + if @slice.size >= @slice_size + save_slice + new_slice end self end - def close - @input.insert(@slice) if @slice.size.positive? + def flush + if @slice_batch_size + @batch << @slice if @slice.size.positive? + @data_store.insert_many(@batch) + @batch = [] + @batch_count = 0 + elsif @slice.size.positive? + @data_store.insert(@slice) + end + end + + def new_slice + @slice = @data_store.new(first_record_number: @record_count + 1) + end + + def save_slice + return flush unless @slice_batch_size + + @batch_count += 1 + return flush if @batch_count >= @slice_batch_size + + @batch << @slice end end end end end