lib/rocket_job/batch/io.rb in rocketjob-5.0.0.rc1 vs lib/rocket_job/batch/io.rb in rocketjob-5.0.0

- old
+ new

@@ -56,18 +56,18 @@ # Default: nil # Automatically detect line endings and break up by line # Searches for the first "\r\n" or "\n" and then uses that as the # delimiter for all subsequent records # - # stream_mode: [:line | :row | :record] + # stream_mode: [:line | :array | :hash] # :line # Uploads the file a line (String) at a time for processing by workers. - # :row + # :array # Parses each line from the file as an Array and uploads each array for processing by workers. - # :record + # :hash # Parses each line from the file into a Hash and uploads each hash for processing by workers. - # See IOStreams::Stream#each_line, IOStreams::Stream#each_row, and IOStreams::Stream#each_record. + # See IOStreams::Stream#each. # # Example: # # Load plain text records from a file # job.upload('hello.csv') # @@ -116,19 +116,24 @@ # * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records. # * CSV parsing is slow, so it is usually left for the workers to do. def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block) raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block + stream_mode = stream_mode.to_sym + # Backward compatibility with existing v4 jobs + stream_mode = :array if stream_mode == :row + stream_mode = :hash if stream_mode == :record + count = if block input(category).upload(on_first: on_first, &block) else path = IOStreams.new(stream) path.file_name = file_name if file_name self.upload_file_name = path.file_name input(category).upload(on_first: on_first) do |io| - path.public_send("each_#{stream_mode}".to_sym, **args) { |line| io << line } + path.each(stream_mode, **args) { |line| io << line } end end self.record_count = (record_count || 0) + count count end @@ -334,11 +339,11 @@ # end # writer << "Trailer\n" # end # # Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks: - # IOStreams.path('/tmp/file.txt.gz').line_writer do |writer| + # IOStreams.path('/tmp/file.txt.gz').writer(:line) do |writer| # writer << "Header" # job.download do |line| # writer << line # end # writer << "Trailer" @@ -350,10 +355,10 @@ def download(stream = nil, category: :main, header_line: nil, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? return output(category).download(header_line: header_line, &block) if block - IOStreams.new(stream).line_writer(**args) do |io| + IOStreams.new(stream).writer(:line, **args) do |io| output(category).download(header_line: header_line) { |record| io << record } end end # Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.