lib/rocket_job/batch/io.rb in rocketjob-4.3.0.beta vs lib/rocket_job/batch/io.rb in rocketjob-4.3.0.beta2
- old
+ new
@@ -65,44 +65,19 @@
# Parses each line from the file as an Array and uploads each array for processing by workers.
# :record
# Parses each line from the file into a Hash and uploads each hash for processing by workers.
# See IOStreams::Stream#each_line, IOStreams::Stream#each_row, and IOStreams::Stream#each_record.
#
- # encoding: [String|Encoding]
- # Encode returned data with this encoding.
- # 'US-ASCII': Original 7 bit ASCII Format
- # 'ASCII-8BIT': 8-bit ASCII Format
- # 'UTF-8': UTF-8 Format
- # Etc.
- # Default: 'UTF-8'
- # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set
- # if not already set in the supplied stream.
- #
- # encode_replace: [String]
- # The character to replace with when a character cannot be converted to the target encoding.
- # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised.
- # Default: nil
- # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set
- # if not already set in the supplied stream.
- #
- # encode_cleaner: [nil|symbol|Proc]
- # Cleanse data read from the input stream.
- # nil: No cleansing
- # :printable Cleanse all non-printable characters except \r and \n
- # Proc/lambda Proc to call after every read to cleanse the data
- # Default: :printable
- # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set
- # if not already set in the supplied stream.
- #
# Example:
# # Load plain text records from a file
# job.upload('hello.csv')
#
# Example:
# # Load plain text records from a file, stripping all non-printable characters,
# # as well as any characters that cannot be converted to UTF-8
- # job.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')
+ # path = IOStreams.path('hello.csv').option(:encode, cleaner: :printable, replace: '')
+ # job.upload(path)
#
# Example: Zip
# # Since csv is not known to RocketJob it is ignored
# job.upload('myfile.csv.zip')
#
@@ -138,19 +113,19 @@
# * When zip format, the Zip file/stream must contain only one file, the first file found will be
# loaded into the job
# * If an io stream is supplied, it is read until it returns nil.
# * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
# * CSV parsing is slow, so it is usually left for the workers to do.
- def upload(stream = nil, file_name: nil, category: :main, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, stream_mode: :line, on_first: nil, **args, &block)
+ def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block)
raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block
count =
if block
input(category).upload(on_first: on_first, &block)
else
- path = build_path(stream, file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace)
-
+ path = IOStreams.new(stream)
+ path.file_name = file_name if file_name
self.upload_file_name = path.file_name
input(category).upload(on_first: on_first) do |io|
path.public_send("each_#{stream_mode}".to_sym, **args) { |line| io << line }
end
end
@@ -370,20 +345,17 @@
# end
#
# Notes:
# - The records are returned in '_id' order. Usually this is the order in
# which the records were originally loaded.
- def download(stream = nil, category: :main, header_line: nil, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, **args, &block)
+ def download(stream = nil, category: :main, header_line: nil, **args, &block)
raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?
- if block
- output(category).download(header_line: header_line, &block)
- else
- path = build_path(stream, nil, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace)
- path.line_writer(**args) do |io|
- output(category).download(header_line: header_line) { |record| io << record }
- end
+ return output(category).download(header_line: header_line, &block) if block
+
+ IOStreams.new(stream).line_writer(**args) do |io|
+ output(category).download(header_line: header_line) { |record| io << record }
end
end
# Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.
#
@@ -414,20 +386,9 @@
RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block)
else
raise(ArgumentError, 'result parameter is required when no block is supplied') unless result
RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result }
end
- end
-
- private
-
- def build_path(stream, file_name, encoding: nil, encode_cleaner: nil, encode_replace: nil)
- path = IOStreams.new(stream)
- path.file_name = file_name if file_name
- if (encoding || encode_cleaner || encode_replace) && !path.setting(:encode)
- path.option_or_stream(:encode, encoding: encoding, cleaner: encode_cleaner, replace: encode_replace)
- end
- path
end
end
end
end