lib/rocket_job/batch/tabular/input.rb in rocketjob-4.3.0.beta vs lib/rocket_job/batch/tabular/input.rb in rocketjob-4.3.0.beta2

- old
+ new

@@ -47,42 +47,42 @@ # # Overrides: RocketJob::Batch::IO#upload # # Notes: # - When supplying a block the header must be set manually - def upload(file_name_or_io = nil, **args, &block) - if tabular_input_type == :text - args[:encoding] = 'UTF-8' - args[:encode_cleaner] = :printable - args[:encode_replace] = '' + def upload(stream = nil, **args, &block) + input_stream = stream.nil? ? nil : IOStreams.new(stream) + + if stream && (tabular_input_type == :text) + input_stream.option_or_stream(:encode, encoding: 'UTF-8', cleaner: :printable, replace: '') end # If an input header is not required, then we don't extract it' - return super(file_name_or_io, stream_mode: tabular_input_mode, **args, &block) unless tabular_input.header? + return super(input_stream, stream_mode: tabular_input_mode, **args, &block) unless tabular_input.header? # If the header is already set then it is not expected in the file if tabular_input_header.present? tabular_input_cleanse_header - return super(file_name_or_io, stream_mode: tabular_input_mode, **args, &block) + return super(input_stream, stream_mode: tabular_input_mode, **args, &block) end case tabular_input_mode when :line parse_header = -> (line) do tabular_input.parse_header(line) tabular_input_cleanse_header self.tabular_input_header = tabular_input.header.columns end - super(file_name_or_io, on_first: parse_header, stream_mode: tabular_input_mode, **args, &block) + super(input_stream, on_first: parse_header, stream_mode: tabular_input_mode, **args, &block) when :row set_header = -> (row) do tabular_input.header.columns = row tabular_input_cleanse_header self.tabular_input_header = tabular_input.header.columns end - super(file_name_or_io, on_first: set_header, stream_mode: tabular_input_mode, **args, &block) + super(input_stream, on_first: set_header, stream_mode: tabular_input_mode, **args, &block) when :record - super(file_name_or_io, stream_mode: tabular_input_mode, **args, &block) + super(input_stream, stream_mode: tabular_input_mode, **args, &block) else raise(ArgumentError, "Invalid tabular_input_mode: #{stream_mode.inspect}") end end