lib/rocket_job/batch/io.rb in rocketjob-4.2.0 vs lib/rocket_job/batch/io.rb in rocketjob-4.3.0.beta

- old
+ new

@@ -17,11 +17,11 @@ raise "Category #{category.inspect}, must be registered in input_categories: #{input_categories.inspect}" unless input_categories.include?(category) || (category == :main) collection_name = "rocket_job.inputs.#{id}" collection_name << ".#{category}" unless category == :main - (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(slice_arguments(collection_name)) + (@inputs ||= {})[category] ||= RocketJob::Sliced::Input.new(collection_name: collection_name, slice_size: slice_size) end # Returns [RocketJob::Sliced::Output] output collection for holding output slices # Returns nil if no output is being collected # @@ -34,96 +34,96 @@ raise "Category #{category.inspect}, must be registered in output_categories: #{output_categories.inspect}" unless output_categories.include?(category) || (category == :main) collection_name = "rocket_job.outputs.#{id}" collection_name << ".#{category}" unless category == :main - (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(slice_arguments(collection_name)) + (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(collection_name: collection_name, slice_size: slice_size) end - # Upload the supplied file_name or stream. + # Upload the supplied file, io, IOStreams::Path, or IOStreams::Stream. # # Returns [Integer] the number of records uploaded. # # Parameters - # file_name_or_io [String | IO] + # stream [String | IO | IOStreams::Path | IOStreams::Stream] # Full path and file name to stream into the job, # Or, an IO Stream that responds to: :read + # Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3 # - # streams [Symbol|Array] - # Streams to convert the data whilst it is being read. - # When nil, the file_name extensions will be inspected to determine what - # streams should be applied. - # Default: nil - # # delimiter[String] # Line / Record delimiter to use to break the stream up into records # Any string to break the stream up by # The records when saved will not include this delimiter # Default: nil # Automatically detect line endings and break up by line # Searches for the first "\r\n" or "\n" and then uses that as the # delimiter for all subsequent records # - # buffer_size [Integer] - # Size of the blocks when reading from the input file / stream. - # Default: 65536 ( 64K ) + # stream_mode: [:line | :row | :record] + # :line + # Uploads the file a line (String) at a time for processing by workers. + # :row + # Parses each line from the file as an Array and uploads each array for processing by workers. + # :record + # Parses each line from the file into a Hash and uploads each hash for processing by workers. + # See IOStreams::Stream#each_line, IOStreams::Stream#each_row, and IOStreams::Stream#each_record. # # encoding: [String|Encoding] # Encode returned data with this encoding. # 'US-ASCII': Original 7 bit ASCII Format # 'ASCII-8BIT': 8-bit ASCII Format # 'UTF-8': UTF-8 Format # Etc. # Default: 'UTF-8' + # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set + # if not already set in the supplied stream. # # encode_replace: [String] # The character to replace with when a character cannot be converted to the target encoding. # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised. # Default: nil + # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set + # if not already set in the supplied stream. # # encode_cleaner: [nil|symbol|Proc] # Cleanse data read from the input stream. # nil: No cleansing # :printable Cleanse all non-printable characters except \r and \n # Proc/lambda Proc to call after every read to cleanse the data # Default: :printable + # NOTE: If a IOStreams::Path, or IOStreams::Stream was supplied then the encoding will be set + # if not already set in the supplied stream. # - # stream_mode: [:line | :row | :record] - # :line - # Uploads the file a line (String) at a time for processing by workers. - # :row - # Parses each line from the file as an Array and uploads each array for processing by workers. - # :record - # Parses each line from the file into a Hash and uploads each hash for processing by workers. - # See IOStream#each_line, IOStream#each_row, and IOStream#each_record. - # # Example: # # Load plain text records from a file - # job.input.upload('hello.csv') + # job.upload('hello.csv') # # Example: # # Load plain text records from a file, stripping all non-printable characters, # # as well as any characters that cannot be converted to UTF-8 - # job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '') + # job.upload('hello.csv', encode_cleaner: :printable, encode_replace: '') # # Example: Zip # # Since csv is not known to RocketJob it is ignored - # job.input.upload('myfile.csv.zip') + # job.upload('myfile.csv.zip') # # Example: Encrypted Zip - # job.input.upload('myfile.csv.zip.enc') + # job.upload('myfile.csv.zip.enc') # # Example: Explicitly set the streams - # job.input.upload('myfile.ze', streams: [:zip, :enc]) + # path = IOStreams.path('myfile.ze').stream(:encode, encoding: 'UTF-8').stream(:zip).stream(:enc) + # job.upload(path) # # Example: Supply custom options - # job.input.upload('myfile.csv.enc', streams: :enc]) + # path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false).option(:encode, encoding: 'UTF-8') + # job.upload(path) # - # Example: Extract streams from filename but write to a temp file - # streams = IOStreams.streams_for_file_name('myfile.gz.enc') - # t = Tempfile.new('my_project') - # job.input.upload(t.to_path, streams: streams) + # Example: Read from a tempfile and use the original file name to determine which streams to apply + # temp_file = Tempfile.new('my_project') + # temp_file.write(gzip_and_encrypted_data) + # stream = IOStreams.stream(temp_file).file_name('myfile.gz.enc') + # job.upload(stream) # # Example: Upload by writing records one at a time to the upload stream # job.upload do |writer| # 10.times { |i| writer << i } # end @@ -138,22 +138,26 @@ # * When zip format, the Zip file/stream must contain only one file, the first file found will be # loaded into the job # * If an io stream is supplied, it is read until it returns nil. # * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records. # * CSV parsing is slow, so it is usually left for the workers to do. - def upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block) - if file_name - self.upload_file_name = file_name - elsif file_name_or_io.is_a?(String) - self.upload_file_name = file_name_or_io - end - count = input(category).upload(file_name_or_io, file_name: file_name, **args, &block) + def upload(stream = nil, file_name: nil, category: :main, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, stream_mode: :line, on_first: nil, **args, &block) + raise(ArgumentError, 'Either stream, or a block must be supplied') unless stream || block + + count = + if block + input(category).upload(on_first: on_first, &block) + else + path = build_path(stream, file_name, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) + + self.upload_file_name = path.file_name + input(category).upload(on_first: on_first) do |io| + path.public_send("each_#{stream_mode}".to_sym, **args) { |line| io << line } + end + end self.record_count = (record_count || 0) + count count - rescue StandardError => exc - input(category).delete_all - raise(exc) end # Upload results from an Arel into RocketJob::SlicedJob. # # Params @@ -186,13 +190,10 @@ # so that if a job is retried during an upload failure, data is not duplicated. def upload_arel(arel, *column_names, category: :main, &block) count = input(category).upload_arel(arel, *column_names, &block) self.record_count = (record_count || 0) + count count - rescue StandardError => exc - input(category).delete_all - raise(exc) end # Upload the result of a MongoDB query to the input collection for processing # Useful when an entire MongoDB collection, or part thereof needs to be # processed by a job. @@ -230,13 +231,10 @@ # so that if a job is retried during an upload failure, data is not duplicated. def upload_mongo_query(criteria, *column_names, category: :main, &block) count = input(category).upload_mongo_query(criteria, *column_names, &block) self.record_count = (record_count || 0) + count count - rescue StandardError => exc - input(category).delete_all - raise(exc) end # Upload sliced range of integer requests as arrays of start and end ids. # # Returns [Integer] last_id - start_id + 1. @@ -261,13 +259,10 @@ def upload_integer_range(start_id, last_id, category: :main) input(category).upload_integer_range(start_id, last_id) count = last_id - start_id + 1 self.record_count = (record_count || 0) + count count - rescue StandardError => exc - input(category).delete_all - raise(exc) end # Upload sliced range of integer requests as an arrays of start and end ids # starting with the last range first # @@ -296,13 +291,10 @@ def upload_integer_range_in_reverse_order(start_id, last_id, category: :main) input(category).upload_integer_range_in_reverse_order(start_id, last_id) count = last_id - start_id + 1 self.record_count = (record_count || 0) + count count - rescue StandardError => exc - input(category).delete_all - raise(exc) end # Upload the supplied slices for processing by workers # # Updates the record_count after adding the records @@ -324,28 +316,75 @@ count = slice.size self.record_count = (record_count || 0) + count count end - # Download the output data into the supplied file_name or stream + # Download the output data into the supplied file, io, IOStreams::Path, or IOStreams::Stream. + # Returns [Integer] the number of records / lines downloaded. # # Parameters - # file_name_or_io [String|IO] - # The file_name of the file to write to, or an IO Stream that implements #write. + # stream [String | IO | IOStreams::Path | IOStreams::Stream] + # Full path and file name to stream into the job, + # Or, an IO stream that responds to: :write + # Or, an IOStreams path such as IOStreams::Paths::File, or IOStreams::Paths::S3 # - # options: - # category [Symbol] - # The category of output to download - # Default: :main + # Example: Zip + # # Since csv is not known to RocketJob it is ignored + # job.download('myfile.csv.zip') # - # See RocketJob::Sliced::Output#download for remaining options + # Example: Encrypted Zip + # job.download('myfile.csv.zip.enc') # - # Returns [Integer] the number of records downloaded - def download(file_name_or_io = nil, category: :main, **args, &block) + # Example: Explicitly set the streams + # path = IOStreams.path('myfile.ze').stream(:zip).stream(:enc) + # job.download(path) + # + # Example: Supply custom options + # path = IOStreams.path('myfile.csv.enc').option(:enc, compress: false) + # job.download(path) + # + # Example: Supply custom options. Set the file name within the zip file. + # path = IOStreams.path('myfile.csv.zip').option(:zip, zip_file_name: 'myfile.csv') + # job.download(path) + # + # Example: Download into a tempfile, or stream, using the original file name to determine the streams to apply: + # tempfile = Tempfile.new('my_project') + # stream = IOStreams.stream(tempfile).file_name('myfile.gz.enc') + # job.download(stream) + # + # Example: Add a header and/or trailer record to the downloaded file: + # IOStreams.path('/tmp/file.txt.gz').writer do |writer| + # writer << "Header\n" + # job.download do |line| + # writer << line + "\n" + # end + # writer << "Trailer\n" + # end + # + # Example: Add a header and/or trailer record to the downloaded file, letting the line writer add the line breaks: + # IOStreams.path('/tmp/file.txt.gz').line_writer do |writer| + # writer << "Header" + # job.download do |line| + # writer << line + # end + # writer << "Trailer" + # end + # + # Notes: + # - The records are returned in '_id' order. Usually this is the order in + # which the records were originally loaded. + def download(stream = nil, category: :main, header_line: nil, encoding: 'UTF-8', encode_cleaner: nil, encode_replace: nil, **args, &block) raise "Cannot download incomplete job: #{id}. Currently in state: #{state}-#{sub_state}" if rocket_job_processing? - output(category).download(file_name_or_io, **args, &block) + if block + output(category).download(header_line: header_line, &block) + else + path = build_path(stream, nil, encoding: encoding, encode_cleaner: encode_cleaner, encode_replace: encode_replace) + path.line_writer(**args) do |io| + output(category).download(header_line: header_line) { |record| io << record } + end + end end # Writes the supplied result, Batch::Result or Batch::Results to the relevant collections. # # If a block is supplied, the block is supplied with a writer that should be used to @@ -379,14 +418,16 @@ end end private - def slice_arguments(collection_name) - { - collection_name: collection_name, - slice_size: slice_size - } + def build_path(stream, file_name, encoding: nil, encode_cleaner: nil, encode_replace: nil) + path = IOStreams.new(stream) + path.file_name = file_name if file_name + if (encoding || encode_cleaner || encode_replace) && !path.setting(:encode) + path.option_or_stream(:encode, encoding: encoding, cleaner: encode_cleaner, replace: encode_replace) + end + path end end end end