lib/rocket_job/batch/io.rb in rocketjob-5.0.0.rc1 vs lib/rocket_job/batch/io.rb in rocketjob-5.0.0
- old
+ new
@@ -56,18 +56,18 @@
# Default: nil
# Automatically detect line endings and break up by line
# Searches for the first "\r\n" or "\n" and then uses that as the
# delimiter for all subsequent records
#
- # stream_mode: [:line | :row | :record]
+ # stream_mode: [:line | :array | :hash]
# :line
# Uploads the file a line (String) at a time for processing by workers.
- # :row
+ # :array
# Parses each line from the file as an Array and uploads each array for processing by workers.
- # :record
+ # :hash
# Parses each line from the file into a Hash and uploads each hash for processing by workers.
- # See IOStreams::Stream#each_line, IOStreams::Stream#each_row, and IOStreams::Stream#each_record.
+ # See IOStreams::Stream#each.
#
# Example:
# # Load plain text records from a file
# job.upload('hello.csv')
#
@@ -116,19 +116,24 @@
# * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
# * CSV parsing is slow, so it is usually left for the workers to do.
def upload(stream = nil, file_name: nil, category: :main, stream_mode: :line, on_first: nil, **args, &block)
raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block
+ stream_mode = stream_mode.to_sym
+ # Backward compatibility with existing v4 jobs
+ stream_mode = :array if stream_mode == :row
+ stream_mode = :hash if stream_mode == :record
+
count =
if block
input(category).upload(on_first: on_first, &block)
else
path = IOStreams.new(stream)
path.file_name = file_name if file_name
self.upload_file_name = path.file_name
input(category).upload(on_first: on_first) do |io|
- path.public_send("each_#{stream_mode}".to_sym, **args) { |line| io << line }
+ path.each(stream_mode, **args) { |line| io << line }
end
end
self.record_count = (record_count || 0) + count
count
end
@@ -334,11 +339,11 @@
# end
# writer << "Trailer\n"
# end
#
# Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks:
- # IOStreams.path('/tmp/file.txt.gz').line_writer do |writer|
+ # IOStreams.path('/tmp/file.txt.gz').writer(:line) do |writer|
# writer << "Header"
# job.download do |line|
# writer << line
# end
# writer << "Trailer"
@@ -350,10 +355,10 @@
def download(stream = nil, category: :main, header_line: nil, **args, &block)
raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?
return output(category).download(header_line: header_line, &block) if block
- IOStreams.new(stream).line_writer(**args) do |io|
+ IOStreams.new(stream).writer(:line, **args) do |io|
output(category).download(header_line: header_line) { |record| io << record }
end
end
# Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.