lib/rocket_job/batch/io.rb in rocketjob-4.2.0 vs lib/rocket_job/batch/io.rb in rocketjob-4.3.0.beta
- old
+ new
@@ -17,11 +17,11 @@
raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" unless input_categories.include?(category) || (category == :main)
collection_name = "rocket_job.inputs.#{id}"
collection_name << ".#{category}" unless category == :main
- (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(slice_arguments(collection_name))
+ (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(collection_name: collection_name, slice_size: slice_size)
end
# Returns [RocketJob::Sliced::Output] output collection for holding output slices
# Returns nil if no output is being collected
#
@@ -34,96 +34,96 @@
raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" unless output_categories.include?(category) || (category == :main)
collection_name = "rocket_job.outputs.#{id}"
collection_name << ".#{category}" unless category == :main
- (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(slice_arguments(collection_name))
+ (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(collection_name: collection_name, slice_size: slice_size)
end
- # Upload the supplied file_name or stream.
+ # Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream.
#
# Returns [Integer] the number of records uploaded.
#
# Parameters
- # file_name_or_io [String | IO]
+ # stream [String | IO | IOStreams::Path | IOStreams::Stream]
# Full path and file name to stream into the job,
# Or, an IO Stream that responds to: :read
+ # Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3
#
- # streams [Symbol|Array]
- # Streams to convert the data whilst it is being read.
- # When nil, the file_name extensions will be inspected to determine what
- # streams should be applied.
- # Default: nil
- #
# delimiter[String]
# Line / Record delimiter to use to break the stream up into records
# Any string to break the stream up by
# The records when saved will not include this delimiter
# Default: nil
# Automatically detect line endings and break up by line
# Searches for the first "\r\n" or "\n" and then uses that as the
# delimiter for all subsequent records
#
- # buffer_size [Integer]
- # Size of the blocks when reading from the input file / stream.
- # Default: 65536 ( 64K )
+ # stream_mode: [:line | :row | :record]
+ # :line
+ # Uploads the file a line (String) at a time for processing by workers.
+ # :row
+ # Parses each line from the file as an Array and uploads each array for processing by workers.
+ # :record
+ # Parses each line from the file into a Hash and uploads each hash for processing by workers.
+ # See IOStreams::Stream#each_line, IOStreams::Stream#each_row, and IOStreams::Stream#each_record.
#
# encoding: [String|Encoding]
# Encode returned data with this encoding.
# 'US-ASCII': Original 7 bit ASCII Format
# 'ASCII-8BIT': 8-bit ASCII Format
# 'UTF-8': UTF-8 Format
# Etc.
# Default: 'UTF-8'
+ # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set
+ # if not already set in the supplied stream.
#
# encode_replace: [String]
# The character to replace with when a character cannot be converted to the target encoding.
# nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised.
# Default: nil
+ # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set
+ # if not already set in the supplied stream.
#
# encode_cleaner: [nil|symbol|Proc]
# Cleanse data read from the input stream.
# nil: No cleansing
# :printable Cleanse all non-printable characters except \r and \n
# Proc/lambda Proc to call after every read to cleanse the data
# Default: :printable
+ # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set
+ # if not already set in the supplied stream.
#
- # stream_mode: [:line | :row | :record]
- # :line
- # Uploads the file a line (String) at a time for processing by workers.
- # :row
- # Parses each line from the file as an Array and uploads each array for processing by workers.
- # :record
- # Parses each line from the file into a Hash and uploads each hash for processing by workers.
- # See IOStream#each_line, IOStream#each_row, and IOStream#each_record.
- #
# Example:
# # Load plain text records from a file
- # job.input.upload('hello.csv')
+ # job.upload('hello.csv')
#
# Example:
# # Load plain text records from a file, stripping all non-printable characters,
# # as well as any characters that cannot be converted to UTF-8
- # job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')
+ # job.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')
#
# Example: Zip
# # Since csv is not known to RocketJob it is ignored
- # job.input.upload('myfile.csv.zip')
+ # job.upload('myfile.csv.zip')
#
# Example: Encrypted Zip
- # job.input.upload('myfile.csv.zip.enc')
+ # job.upload('myfile.csv.zip.enc')
#
# Example: Explicitly set the streams
- # job.input.upload('myfile.ze', streams: [:zip, :enc])
+ # path = IOStreams.path('myfile.ze').stream(:encode, encoding: 'UTF-8').stream(:zip).stream(:enc)
+ # job.upload(path)
#
# Example: Supply custom options
- # job.input.upload('myfile.csv.enc', streams: :enc])
+ # path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false).option(:encode, encoding: 'UTF-8')
+ # job.upload(path)
#
- # Example: Extract streams from filename but write to a temp file
- # streams = IOStreams.streams_for_file_name('myfile.gz.enc')
- # t = Tempfile.new('my_project')
- # job.input.upload(t.to_path, streams: streams)
+ # Example: Read from a tempfile and use the original file name to determine which streams to apply
+ # temp_file = Tempfile.new('my_project')
+ # temp_file.write(gzip_and_encrypted_data)
+ # stream = IOStreams.stream(temp_file).file_name('myfile.gz.enc')
+ # job.upload(stream)
#
# Example: Upload by writing records one at a time to the upload stream
# job.upload do |writer|
# 10.times { |i| writer << i }
# end
@@ -138,22 +138,26 @@
# * When zip format, the Zip file/stream must contain only one file, the first file found will be
# loaded into the job
# * If an io stream is supplied, it is read until it returns nil.
# * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
# * CSV parsing is slow, so it is usually left for the workers to do.
- def upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block)
- if file_name
- self.upload_file_name = file_name
- elsif file_name_or_io.is_a?(String)
- self.upload_file_name = file_name_or_io
- end
- count = input(category).upload(file_name_or_io, file_name: file_name, **args, &block)
+ def upload(stream = nil, file_name: nil, category: :main, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, stream_mode: :line, on_first: nil, **args, &block)
+ raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block
+
+ count =
+ if block
+ input(category).upload(on_first: on_first, &block)
+ else
+ path = build_path(stream, file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace)
+
+ self.upload_file_name = path.file_name
+ input(category).upload(on_first: on_first) do |io|
+ path.public_send("each_#{stream_mode}".to_sym, **args) { |line| io << line }
+ end
+ end
self.record_count = (record_count || 0) + count
count
- rescue StandardError => exc
- input(category).delete_all
- raise(exc)
end
# Upload results from an Arel into RocketJob::SlicedJob.
#
# Params
@@ -186,13 +190,10 @@
# so that if a job is retried during an upload failure, data is not duplicated.
def upload_arel(arel, *column_names, category: :main, &block)
count = input(category).upload_arel(arel, *column_names, &block)
self.record_count = (record_count || 0) + count
count
- rescue StandardError => exc
- input(category).delete_all
- raise(exc)
end
# Upload the result of a MongoDB query to the input collection for processing
# Useful when an entire MongoDB collection, or part thereof needs to be
# processed by a job.
@@ -230,13 +231,10 @@
# so that if a job is retried during an upload failure, data is not duplicated.
def upload_mongo_query(criteria, *column_names, category: :main, &block)
count = input(category).upload_mongo_query(criteria, *column_names, &block)
self.record_count = (record_count || 0) + count
count
- rescue StandardError => exc
- input(category).delete_all
- raise(exc)
end
# Upload sliced range of integer requests as arrays of start and end ids.
#
# Returns [Integer] last_id - start_id + 1.
@@ -261,13 +259,10 @@
def upload_integer_range(start_id, last_id, category: :main)
input(category).upload_integer_range(start_id, last_id)
count = last_id - start_id + 1
self.record_count = (record_count || 0) + count
count
- rescue StandardError => exc
- input(category).delete_all
- raise(exc)
end
# Upload sliced range of integer requests as an arrays of start and end ids
# starting with the last range first
#
@@ -296,13 +291,10 @@
def upload_integer_range_in_reverse_order(start_id, last_id, category: :main)
input(category).upload_integer_range_in_reverse_order(start_id, last_id)
count = last_id - start_id + 1
self.record_count = (record_count || 0) + count
count
- rescue StandardError => exc
- input(category).delete_all
- raise(exc)
end
# Upload the supplied slices for processing by workers
#
# Updates the record_count after adding the records
@@ -324,28 +316,75 @@
count = slice.size
self.record_count = (record_count || 0) + count
count
end
- # Download the output data into the supplied file_name or stream
+ # Download the output data into the supplied file, io, IOStreams::Path, or IOStreams::Stream.
+ # Returns [Integer] the number of records / lines downloaded.
#
# Parameters
- # file_name_or_io [String|IO]
- # The file_name of the file to write to, or an IO Stream that implements #write.
+ # stream [String | IO | IOStreams::Path | IOStreams::Stream]
+ # Full path and file name to stream into the job,
+ # Or, an IO stream that responds to: :write
+ # Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3
#
- # options:
- # category [Symbol]
- # The category of output to download
- # Default: :main
+ # Example: Zip
+ # # Since csv is not known to RocketJob it is ignored
+ # job.download('myfile.csv.zip')
#
- # See RocketJob::Sliced::Output#download for remaining options
+ # Example: Encrypted Zip
+ # job.download('myfile.csv.zip.enc')
#
- # Returns [Integer] the number of records downloaded
- def download(file_name_or_io = nil, category: :main, **args, &block)
+ # Example: Explicitly set the streams
+ # path = IOStreams.path('myfile.ze').stream(:zip).stream(:enc)
+ # job.download(path)
+ #
+ # Example: Supply custom options
+ # path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false)
+ # job.download(path)
+ #
+ # Example: Supply custom options. Set the file name within the zip file.
+ # path = IOStreams.path('myfile.csv.zip').option(:zip, zip_file_name: 'myfile.csv')
+ # job.download(path)
+ #
+ # Example: Download into a tempfile, or stream, using the original file name to determine the streams to apply:
+ # tempfile = Tempfile.new('my_project')
+ # stream = IOStreams.stream(tempfile).file_name('myfile.gz.enc')
+ # job.download(stream)
+ #
+ # Example: Add a header and/or trailer record to the downloaded file:
+ # IOStreams.path('/tmp/file.txt.gz').writer do |writer|
+ # writer << "Header\n"
+ # job.download do |line|
+ # writer << line + "\n"
+ # end
+ # writer << "Trailer\n"
+ # end
+ #
+ # Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks:
+ # IOStreams.path('/tmp/file.txt.gz').line_writer do |writer|
+ # writer << "Header"
+ # job.download do |line|
+ # writer << line
+ # end
+ # writer << "Trailer"
+ # end
+ #
+ # Notes:
+ # - The records are returned in '_id' order. Usually this is the order in
+ # which the records were originally loaded.
+ def download(stream = nil, category: :main, header_line: nil, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, **args, &block)
raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing?
- output(category).download(file_name_or_io, **args, &block)
+ if block
+ output(category).download(header_line: header_line, &block)
+ else
+ path = build_path(stream, nil, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace)
+ path.line_writer(**args) do |io|
+ output(category).download(header_line: header_line) { |record| io << record }
+ end
+ end
end
# Writes the supplied result, Batch::Result or Batch::Results to the relevant collections.
#
# If a block is supplied, the block is supplied with a writer that should be used to
@@ -379,14 +418,16 @@
end
end
private
- def slice_arguments(collection_name)
- {
- collection_name: collection_name,
- slice_size: slice_size
- }
+ def build_path(stream, file_name, encoding: nil, encode_cleaner: nil, encode_replace: nil)
+ path = IOStreams.new(stream)
+ path.file_name = file_name if file_name
+ if (encoding || encode_cleaner || encode_replace) && !path.setting(:encode)
+ path.option_or_stream(:encode, encoding: encoding, cleaner: encode_cleaner, replace: encode_replace)
+ end
+ path
end
end
end
end