lib/rocket_job/batch/io.rb in rocketjob-4.3.0.beta vs lib/rocket_job/batch/io.rb in rocketjob-4.3.0.beta2

- old
+ new

@@ -65,44 +65,19 @@ # Parses each line from the file as an Array and uploads each array for processing by workers. # :record # Parses each line from the file into a Hash and uploads each hash for processing by workers. # See IOStreams::Stream#each_line, IOStreams::Stream#each_row, and IOStreams::Stream#each_record. # - # encoding: [String|Encoding] - # Encode returned data with this encoding. - # 'US-ASCII': Original 7 bit ASCII Format - # 'ASCII-8BIT': 8-bit ASCII Format - # 'UTF-8': UTF-8 Format - # Etc. - # Default: 'UTF-8' - # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set - # if not already set in the supplied stream. - # - # encode_replace: [String] - # The character to replace with when a character cannot be converted to the target encoding. - # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised. - # Default: nil - # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set - # if not already set in the supplied stream. - # - # encode_cleaner: [nil|symbol|Proc] - # Cleanse data read from the input stream. - # nil: No cleansing - # :printable Cleanse all non-printable characters except \r and \n - # Proc/lambda Proc to call after every read to cleanse the data - # Default: :printable - # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set - # if not already set in the supplied stream. - # # Example: # # Load plain text records from a file # job.upload('hello.csv') # # Example: # # Load plain text records from a file, stripping all non-printable characters, # # as well as any characters that cannot be converted to UTF-8 - # job.upload('hello.csv', encode_cleaner: :printable, encode_replace: '') + # path = IOStreams.path('hello.csv').option(:encode, cleaner: :printable, replace: '') + # job.upload(path) # # Example: Zip # # Since csv is not known to RocketJob it is ignored # job.upload('myfile.csv.zip') # @@ -138,19 +113,19 @@ # * When zip format, the Zip file/stream must contain only one file, the first file found will be # loaded into the job # * If an io stream is supplied, it is read until it returns nil. # * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records. # * CSV parsing is slow, so it is usually left for the workers to do. - def upload(stream = nil, file_name: nil, category: :main, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, stream_mode: :line, on_first: nil, **args, &block) + def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block) raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block count = if block input(category).upload(on_first: on_first, &block) else - path = build_path(stream, file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) - + path = IOStreams.new(stream) + path.file_name = file_name if file_name self.upload_file_name = path.file_name input(category).upload(on_first: on_first) do |io| path.public_send("each_#{stream_mode}".to_sym, **args) { |line| io << line } end end @@ -370,20 +345,17 @@ # end # # Notes: # - The records are returned in '_id' order. Usually this is the order in # which the records were originally loaded. - def download(stream = nil, category: :main, header_line: nil, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, **args, &block) + def download(stream = nil, category: :main, header_line: nil, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? - if block - output(category).download(header_line: header_line, &block) - else - path = build_path(stream, nil, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) - path.line_writer(**args) do |io| - output(category).download(header_line: header_line) { |record| io << record } - end + return output(category).download(header_line: header_line, &block) if block + + IOStreams.new(stream).line_writer(**args) do |io| + output(category).download(header_line: header_line) { |record| io << record } end end # Writes the supplied result, Batch::Result or Batch::Results to the relevant collections. # @@ -414,20 +386,9 @@ RocketJob::Sliced::Writer::Output.collect(self, input_slice, &block) else raise(ArgumentError, 'result parameter is required when no block is supplied') unless result RocketJob::Sliced::Writer::Output.collect(self, input_slice) { |writer| writer << result } end - end - - private - - def build_path(stream, file_name, encoding: nil, encode_cleaner: nil, encode_replace: nil) - path = IOStreams.new(stream) - path.file_name = file_name if file_name - if (encoding || encode_cleaner || encode_replace) && !path.setting(:encode) - path.option_or_stream(:encode, encoding: encoding, cleaner: encode_cleaner, replace: encode_replace) - end - path end end end end