lib/rocket_job/sliced/input.rb in rocketjob-4.1.1 vs lib/rocket_job/sliced/input.rb in rocketjob-4.2.0
- old
+ new
@@ -1,106 +1,8 @@
module RocketJob
module Sliced
class Input < Slices
- # Load lines for processing from the supplied filename or stream into this job.
- #
- # Returns [Integer] the number of lines loaded into this collection
- #
- # Parameters
- # file_name_or_io [String | IO]
- # Full path and file name to stream into the job,
- # Or, an IO Stream that responds to: :read
- #
- # streams [Symbol|Array]
- # Streams to convert the data whilst it is being read.
- # When nil, the file_name extensions will be inspected to determine what
- # streams should be applied.
- # Default: nil
- #
- # delimiter[String]
- # Line / Record delimiter to use to break the stream up into records
- # Any string to break the stream up by
- # The records when saved will not include this delimiter
- # Default: nil
- # Automatically detect line endings and break up by line
- # Searches for the first "\r\n" or "\n" and then uses that as the
- # delimiter for all subsequent records
- #
- # buffer_size [Integer]
- # Size of the blocks when reading from the input file / stream.
- # Default: 65536 ( 64K )
- #
- # encoding: [String|Encoding]
- # Encode returned data with this encoding.
- # 'US-ASCII': Original 7 bit ASCII Format
- # 'ASCII-8BIT': 8-bit ASCII Format
- # 'UTF-8': UTF-8 Format
- # Etc.
- # Default: 'UTF-8'
- #
- # encode_replace: [String]
- # The character to replace with when a character cannot be converted to the target encoding.
- # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised.
- # Default: nil
- #
- # encode_cleaner: [nil|symbol|Proc]
- # Cleanse data read from the input stream.
- # nil: No cleansing
- # :printable Cleanse all non-printable characters except \r and \n
- # Proc/lambda Proc to call after every read to cleanse the data
- # Default: :printable
- #
- # stream_mode: [:line | :row | :record]
- # :line
- # Uploads the file a line (String) at a time for processing by workers.
- # :row
- # Parses each line from the file as an Array and uploads each array for processing by workers.
- # :record
- # Parses each line from the file into a Hash and uploads each hash for processing by workers.
- # See IOStream#each_line, IOStream#each_row, and IOStream#each_record.
- #
- # Example:
- # # Load plain text records from a file
- # job.input.upload('hello.csv')
- #
- # Example:
- # # Load plain text records from a file, stripping all non-printable characters,
- # # as well as any characters that cannot be converted to UTF-8
- # job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '')
- #
- # Example: Zip
- # # Since csv is not known to RocketJob it is ignored
- # job.input.upload('myfile.csv.zip')
- #
- # Example: Encrypted Zip
- # job.input.upload('myfile.csv.zip.enc')
- #
- # Example: Explicitly set the streams
- # job.input.upload('myfile.ze', streams: [:zip, :enc])
- #
- # Example: Supply custom options
- # job.input.upload('myfile.csv.enc', streams: :enc])
- #
- # Example: Extract streams from filename but write to a temp file
- # streams = IOStreams.streams_for_file_name('myfile.gz.enc')
- # t = Tempfile.new('my_project')
- # job.input.upload(t.to_path, streams: streams)
- #
- # Example: Upload by writing records one at a time to the upload stream
- # job.upload do |writer|
- # 10.times { |i| writer << i }
- # end
- #
- # Notes:
- # - By default all data read from the file/stream is converted into UTF-8 before being persisted. This
- # is recommended since Mongo only supports UTF-8 strings.
- # - When zip format, the Zip file/stream must contain only one file, the first file found will be
- # loaded into the job
- # - If an io stream is supplied, it is read until it returns nil.
- # - Only use this method for UTF-8 data, for binary data use #input_slice or #input_records.
- # - Only call from one thread at a time per job instance.
- # - CSV parsing is slow, so it is left for the workers to do.
def upload(file_name_or_io = nil, encoding: 'UTF-8', stream_mode: :line, on_first: nil, **args, &block)
raise(ArgumentError, 'Either file_name_or_io, or a block must be supplied') unless file_name_or_io || block
block ||= -> (io) do
iterator = "each_#{stream_mode}".to_sym
@@ -108,39 +10,10 @@
end
Writer::Input.collect(self, on_first: on_first, &block)
end
- # Upload the result of a MongoDB query to the input collection for processing
- # Useful when an entire MongoDB collection, or part thereof needs to be
- # processed by a job.
- #
- # Returns [Integer] the number of records uploaded
- #
- # If a Block is supplied it is passed the document returned from the
- # database and should return a record for processing
- #
- # If no Block is supplied then the record will be the :fields returned
- # from MongoDB
- #
- # Note:
- # This method uses the collection and not the MongoMapper document to
- # avoid the overhead of constructing a Model with every document returned
- # by the query
- #
- # Note:
- # The Block must return types that can be serialized to BSON.
- # Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time
- # Invalid: Date, etc.
- #
- # Example: Upload document ids
- # criteria = User.where(state: 'FL')
- # job.record_count = job.upload_mongo_query(criteria)
- #
- # Example: Upload just the supplied column
- # criteria = User.where(state: 'FL')
- # job.record_count = job.upload_mongo_query(criteria, :zip_code)
def upload_mongo_query(criteria, *column_names, &block)
options = criteria.options
# Without a block extract the fields from the supplied criteria
if block
@@ -169,34 +42,10 @@
records << block.call(document)
end
end
end
- # Upload results from an Arel into RocketJob::SlicedJob.
- #
- # Params
- # column_names
- # When a block is not supplied, supply the names of the columns to be returned
- # and uploaded into the job
- # These columns are automatically added to the select list to reduce overhead
- #
- # If a Block is supplied it is passed the model returned from the database and should
- # return the work item to be uploaded into the job.
- #
- # Returns [Integer] the number of records uploaded
- #
- # Example: Upload id's for all users
- # arel = User.all
- # job.record_count = job.upload_arel(arel)
- #
- # Example: Upload selected user id's
- # arel = User.where(country_code: 'US')
- # job.record_count = job.upload_arel(arel)
- #
- # Example: Upload user_name and zip_code
- # arel = User.where(country_code: 'US')
- # job.record_count = job.upload_arel(arel, :user_name, :zip_code)
def upload_arel(arel, *column_names, &block)
unless block
column_names = column_names.collect(&:to_sym)
column_names << :id if column_names.size.zero?
@@ -215,25 +64,10 @@
Writer::Input.collect(self) do |records|
arel.find_each { |model| records << block.call(model) }
end
end
- # Upload sliced range of integer requests as a an arrays of start and end ids
- #
- # Returns [Integer] the number of slices uploaded
- #
- # Uploads one range per slice so that the response can return multiple records
- # for each slice processed
- #
- # Example
- # job.slice_size = 100
- # job.record_count = job.upload_integer_range(200, 421)
- #
- # # Equivalent to calling:
- # job.record_count = job.insert([200,299])
- # job.record_count += job.insert([300,399])
- # job.record_count += job.insert([400,421])
def upload_integer_range(start_id, last_id)
create_indexes
count = 0
while start_id <= last_id
end_id = start_id + slice_size - 1
@@ -243,29 +77,10 @@
count += 1
end
count
end
- # Upload sliced range of integer requests as an arrays of start and end ids
- # starting with the last range first
- #
- # Returns [Integer] the number of slices uploaded
- #
- # Uploads one range per slice so that the response can return multiple records
- # for each slice processed.
- # Useful for when the highest order integer values should be processed before
- # the lower integer value ranges. For example when processing every record
- # in a database based on the id column
- #
- # Example
- # job.slice_size = 100
- # job.record_count = job.upload_integer_range_in_reverse_order(200, 421) * job.slice_size
- #
- # # Equivalent to calling:
- # job.insert([400,421])
- # job.insert([300,399])
- # job.insert([200,299])
def upload_integer_range_in_reverse_order(start_id, last_id)
create_indexes
end_id = last_id
count = 0
while end_id >= start_id
@@ -288,12 +103,11 @@
# ap slice
# end
#
def each_failed_record
failed.each do |slice|
- if slice.exception && (record_number = slice.exception.record_number)
- yield(slice.at(record_number - 1), slice)
- end
+ record = slice.failed_record
+ yield(record, slice) unless record.nil?
end
end
# Requeue all failed slices
def requeue_failed