lib/rocket_job/batch/tabular/input.rb in rocketjob-4.3.0.beta vs lib/rocket_job/batch/tabular/input.rb in rocketjob-4.3.0.beta2
- old
+ new
@@ -47,42 +47,42 @@
#
# Overrides: RocketJob::Batch::IO#upload
#
# Notes:
# - When supplying a block the header must be set manually
- def upload(file_name_or_io = nil, **args, &block)
- if tabular_input_type == :text
- args[:encoding] = 'UTF-8'
- args[:encode_cleaner] = :printable
- args[:encode_replace] = ''
+ def upload(stream = nil, **args, &block)
+ input_stream = stream.nil? ? nil : IOStreams.new(stream)
+
+ if stream && (tabular_input_type == :text)
+ input_stream.option_or_stream(:encode, encoding: 'UTF-8', cleaner: :printable, replace: '')
end
# If an input header is not required, then we don't extract it'
- return super(file_name_or_io, stream_mode: tabular_input_mode, **args, &block) unless tabular_input.header?
+ return super(input_stream, stream_mode: tabular_input_mode, **args, &block) unless tabular_input.header?
# If the header is already set then it is not expected in the file
if tabular_input_header.present?
tabular_input_cleanse_header
- return super(file_name_or_io, stream_mode: tabular_input_mode, **args, &block)
+ return super(input_stream, stream_mode: tabular_input_mode, **args, &block)
end
case tabular_input_mode
when :line
parse_header = -> (line) do
tabular_input.parse_header(line)
tabular_input_cleanse_header
self.tabular_input_header = tabular_input.header.columns
end
- super(file_name_or_io, on_first: parse_header, stream_mode: tabular_input_mode, **args, &block)
+ super(input_stream, on_first: parse_header, stream_mode: tabular_input_mode, **args, &block)
when :row
set_header = -> (row) do
tabular_input.header.columns = row
tabular_input_cleanse_header
self.tabular_input_header = tabular_input.header.columns
end
- super(file_name_or_io, on_first: set_header, stream_mode: tabular_input_mode, **args, &block)
+ super(input_stream, on_first: set_header, stream_mode: tabular_input_mode, **args, &block)
when :record
- super(file_name_or_io, stream_mode: tabular_input_mode, **args, &block)
+ super(input_stream, stream_mode: tabular_input_mode, **args, &block)
else
raise(ArgumentError, "Invalid tabular_input_mode: #{stream_mode.inspect}")
end
end