lib/rocket_job/batch/io.rb in rocketjob-4.1.1 vs lib/rocket_job/batch/io.rb in rocketjob-4.2.0
- old
+ new
@@ -37,45 +37,271 @@
collection_name << ".#{category}" unless category == :main
(@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(slice_arguments(collection_name))
end
- # Upload the supplied file_name or stream
+ # Upload the supplied file_name or stream.
#
- # Updates the record_count after adding the records
+ # Returns [Integer] the number of records uploaded.
#
- # Options
- # :file_name [String]
- # When file_name_or_io is an IO, the original base file name if any.
- # Default: nil
+ # Parameters
+ # file_name_or_io [String | IO]
+ # Full path and file name to stream into the job,
+ # Or, an IO Stream that responds to: :read
#
- # See RocketJob::Sliced::Input#upload for remaining options
+ # streams [Symbol|Array]
+ # Streams to convert the data whilst it is being read.
+ # When nil, the file_name extensions will be inspected to determine what
+ # streams should be applied.
+ # Default: nil
#
- # Returns [Integer] the number of records uploaded
+ # delimiter[String]
+ # Line / Record delimiter to use to break the stream up into records
+ # Any string to break the stream up by
+ # The records when saved will not include this delimiter
+ # Default: nil
+ # Automatically detect line endings and break up by line
+ # Searches for the first "\r\n" or "\n" and then uses that as the
+ # delimiter for all subsequent records
#
- # Note:
- # * Not thread-safe. Only call from one thread at a time
+ # buffer_size [Integer]
+ # Size of the blocks when reading from the input file / stream.
+ # Default: 65536 ( 64K )
+ #
+ # encoding: [String|Encoding]
+ # Encode returned data with this encoding.
+ # 'US-ASCII': Original 7 bit ASCII Format
+ # 'ASCII-8BIT': 8-bit ASCII Format
+ # 'UTF-8': UTF-8 Format
+ # Etc.
+ # Default: 'UTF-8'
+ #
+ # encode_replace: [String]
+ # The character to replace with when a character cannot be converted to the target encoding.
+ # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised.
+ # Default: nil
+ #
+ # encode_cleaner: [nil|symbol|Proc]
+ # Cleanse data read from the input stream.
+ # nil: No cleansing
+ # :printable Cleanse all non-printable characters except \r and \n
+ # Proc/lambda Proc to call after every read to cleanse the data
+ # Default: :printable
+ #
+ # stream_mode: [:line | :row | :record]
+ # :line
+ # Uploads the file a line (String) at a time for processing by workers.
+ # :row
+ # Parses each line from the file as an Array and uploads each array for processing by workers.
+ # :record
+ # Parses each line from the file into a Hash and uploads each hash for processing by workers.
+ # See IOStream#each_line, IOStream#each_row, and IOStream#each_record.
+ #
+ # Example:
+ # # Load plain text records from a file
+ # job.input.upload('hello.csv')
+ #
+ # Example:
+ # # Load plain text records from a file, stripping all non-printable characters,
+ # # as well as any characters that cannot be converted to UTF-8
+ # job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')
+ #
+ # Example: Zip
+ # # Since csv is not known to RocketJob it is ignored
+ # job.input.upload('myfile.csv.zip')
+ #
+ # Example: Encrypted Zip
+ # job.input.upload('myfile.csv.zip.enc')
+ #
+ # Example: Explicitly set the streams
+ # job.input.upload('myfile.ze', streams: [:zip, :enc])
+ #
+ # Example: Supply custom options
+ # job.input.upload('myfile.csv.enc', streams: :enc])
+ #
+ # Example: Extract streams from filename but write to a temp file
+ # streams = IOStreams.streams_for_file_name('myfile.gz.enc')
+ # t = Tempfile.new('my_project')
+ # job.input.upload(t.to_path, streams: streams)
+ #
+ # Example: Upload by writing records one at a time to the upload stream
+ # job.upload do |writer|
+ # 10.times { |i| writer << i }
+ # end
+ #
+ # Notes:
+ # * Only call from one thread at a time against a single instance of this job.
+ # * The record_count for the job is set to the number of records returned by the arel.
+ # * If an exception is raised while uploading data, the input collection is cleared out
+ # so that if a job is retried during an upload failure, data is not duplicated.
+ # * By default all data read from the file/stream is converted into UTF-8 before being persisted. This
+ # is recommended since Mongo only supports UTF-8 strings.
+ # * When zip format, the Zip file/stream must contain only one file, the first file found will be
+ # loaded into the job
+ # * If an io stream is supplied, it is read until it returns nil.
+ # * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
+ # * CSV parsing is slow, so it is usually left for the workers to do.
def upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block)
if file_name
self.upload_file_name = file_name
elsif file_name_or_io.is_a?(String)
self.upload_file_name = file_name_or_io
end
count = input(category).upload(file_name_or_io, file_name: file_name, **args, &block)
self.record_count = (record_count || 0) + count
count
+ rescue StandardError => exc
+ input(category).delete_all
+ raise(exc)
end
+ # Upload results from an Arel into RocketJob::SlicedJob.
+ #
+ # Params
+ # column_names
+ # When a block is not supplied, supply the names of the columns to be returned
+ # and uploaded into the job
+ # These columns are automatically added to the select list to reduce overhead
+ #
+ # If a Block is supplied it is passed the model returned from the database and should
+ # return the work item to be uploaded into the job.
+ #
+ # Returns [Integer] the number of records uploaded
+ #
+ # Example: Upload id's for all users
+ # arel = User.all
+ # job.upload_arel(arel)
+ #
+ # Example: Upload selected user id's
+ # arel = User.where(country_code: 'US')
+ # job.upload_arel(arel)
+ #
+ # Example: Upload user_name and zip_code
+ # arel = User.where(country_code: 'US')
+ # job.upload_arel(arel, :user_name, :zip_code)
+ #
+ # Notes:
+ # * Only call from one thread at a time against a single instance of this job.
+ # * The record_count for the job is set to the number of records returned by the arel.
+ # * If an exception is raised while uploading data, the input collection is cleared out
+ # so that if a job is retried during an upload failure, data is not duplicated.
def upload_arel(arel, *column_names, category: :main, &block)
count = input(category).upload_arel(arel, *column_names, &block)
self.record_count = (record_count || 0) + count
count
+ rescue StandardError => exc
+ input(category).delete_all
+ raise(exc)
end
+ # Upload the result of a MongoDB query to the input collection for processing
+ # Useful when an entire MongoDB collection, or part thereof needs to be
+ # processed by a job.
+ #
+ # Returns [Integer] the number of records uploaded
+ #
+ # If a Block is supplied it is passed the document returned from the
+ # database and should return a record for processing
+ #
+ # If no Block is supplied then the record will be the :fields returned
+ # from MongoDB
+ #
+ # Note:
+ # This method uses the collection and not the MongoMapper document to
+ # avoid the overhead of constructing a Model with every document returned
+ # by the query
+ #
+ # Note:
+ # The Block must return types that can be serialized to BSON.
+ # Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time
+ # Invalid: Date, etc.
+ #
+ # Example: Upload document ids
+ # criteria = User.where(state: 'FL')
+ # job.record_count = job.upload_mongo_query(criteria)
+ #
+ # Example: Upload just the supplied column
+ # criteria = User.where(state: 'FL')
+ # job.record_count = job.upload_mongo_query(criteria, :zip_code)
+ #
+ # Notes:
+ # * Only call from one thread at a time against a single instance of this job.
+ # * The record_count for the job is set to the number of records returned by the monqo query.
+ # * If an exception is raised while uploading data, the input collection is cleared out
+ # so that if a job is retried during an upload failure, data is not duplicated.
def upload_mongo_query(criteria, *column_names, category: :main, &block)
count = input(category).upload_mongo_query(criteria, *column_names, &block)
self.record_count = (record_count || 0) + count
count
+ rescue StandardError => exc
+ input(category).delete_all
+ raise(exc)
+ end
+
+ # Upload sliced range of integer requests as arrays of start and end ids.
+ #
+ # Returns [Integer] last_id - start_id + 1.
+ #
+ # Uploads one range per slice so that the response can return multiple records
+ # for each slice processed
+ #
+ # Example
+ # job.slice_size = 100
+ # job.upload_integer_range(200, 421)
+ #
+ # # Equivalent to calling:
+ # job.input.insert([200,299])
+ # job.input.insert([300,399])
+ # job.input.insert([400,421])
+ #
+ # Notes:
+ # * Only call from one thread at a time against a single instance of this job.
+ # * The record_count for the job is set to: last_id - start_id + 1.
+ # * If an exception is raised while uploading data, the input collection is cleared out
+ # so that if a job is retried during an upload failure, data is not duplicated.
+ def upload_integer_range(start_id, last_id, category: :main)
+ input(category).upload_integer_range(start_id, last_id)
+ count = last_id - start_id + 1
+ self.record_count = (record_count || 0) + count
+ count
+ rescue StandardError => exc
+ input(category).delete_all
+ raise(exc)
+ end
+
+ # Upload sliced range of integer requests as an arrays of start and end ids
+ # starting with the last range first
+ #
+ # Returns [Integer] last_id - start_id + 1.
+ #
+ # Uploads one range per slice so that the response can return multiple records
+ # for each slice processed.
+ # Useful for when the highest order integer values should be processed before
+ # the lower integer value ranges. For example when processing every record
+ # in a database based on the id column
+ #
+ # Example
+ # job.slice_size = 100
+ # job.upload_integer_range_in_reverse_order(200, 421)
+ #
+ # # Equivalent to calling:
+ # job.input.insert([400,421])
+ # job.input.insert([300,399])
+ # job.input.insert([200,299])
+ #
+ # Notes:
+ # * Only call from one thread at a time against a single instance of this job.
+ # * The record_count for the job is set to: last_id - start_id + 1.
+ # * If an exception is raised while uploading data, the input collection is cleared out
+ # so that if a job is retried during an upload failure, data is not duplicated.
+ def upload_integer_range_in_reverse_order(start_id, last_id, category: :main)
+ input(category).upload_integer_range_in_reverse_order(start_id, last_id)
+ count = last_id - start_id + 1
+ self.record_count = (record_count || 0) + count
+ count
+ rescue StandardError => exc
+ input(category).delete_all
+ raise(exc)
end
# Upload the supplied slices for processing by workers
#
# Updates the record_count after adding the records