lib/rocket_job/sliced/input.rb in rocketjob-4.1.1 vs lib/rocket_job/sliced/input.rb in rocketjob-4.2.0

- old
+ new

@@ -1,106 +1,8 @@ module RocketJob module Sliced class Input < Slices - # Load lines for processing from the supplied filename or stream into this job. - # - # Returns [Integer] the number of lines loaded into this collection - # - # Parameters - # file_name_or_io [String | IO] - # Full path and file name to stream into the job, - # Or, an IO Stream that responds to: :read - # - # streams [Symbol|Array] - # Streams to convert the data whilst it is being read. - # When nil, the file_name extensions will be inspected to determine what - # streams should be applied. - # Default: nil - # - # delimiter[String] - # Line / Record delimiter to use to break the stream up into records - # Any string to break the stream up by - # The records when saved will not include this delimiter - # Default: nil - # Automatically detect line endings and break up by line - # Searches for the first "\r\n" or "\n" and then uses that as the - # delimiter for all subsequent records - # - # buffer_size [Integer] - # Size of the blocks when reading from the input file / stream. - # Default: 65536 ( 64K ) - # - # encoding: [String|Encoding] - # Encode returned data with this encoding. - # 'US-ASCII': Original 7 bit ASCII Format - # 'ASCII-8BIT': 8-bit ASCII Format - # 'UTF-8': UTF-8 Format - # Etc. - # Default: 'UTF-8' - # - # encode_replace: [String] - # The character to replace with when a character cannot be converted to the target encoding. - # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised. - # Default: nil - # - # encode_cleaner: [nil|symbol|Proc] - # Cleanse data read from the input stream. - # nil: No cleansing - # :printable Cleanse all non-printable characters except \r and \n - # Proc/lambda Proc to call after every read to cleanse the data - # Default: :printable - # - # stream_mode: [:line | :row | :record] - # :line - # Uploads the file a line (String) at a time for processing by workers. - # :row - # Parses each line from the file as an Array and uploads each array for processing by workers. - # :record - # Parses each line from the file into a Hash and uploads each hash for processing by workers. - # See IOStream#each_line, IOStream#each_row, and IOStream#each_record. - # - # Example: - # # Load plain text records from a file - # job.input.upload('hello.csv') - # - # Example: - # # Load plain text records from a file, stripping all non-printable characters, - # # as well as any characters that cannot be converted to UTF-8 - # job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '') - # - # Example: Zip - # # Since csv is not known to RocketJob it is ignored - # job.input.upload('myfile.csv.zip') - # - # Example: Encrypted Zip - # job.input.upload('myfile.csv.zip.enc') - # - # Example: Explicitly set the streams - # job.input.upload('myfile.ze', streams: [:zip, :enc]) - # - # Example: Supply custom options - # job.input.upload('myfile.csv.enc', streams: :enc]) - # - # Example: Extract streams from filename but write to a temp file - # streams = IOStreams.streams_for_file_name('myfile.gz.enc') - # t = Tempfile.new('my_project') - # job.input.upload(t.to_path, streams: streams) - # - # Example: Upload by writing records one at a time to the upload stream - # job.upload do |writer| - # 10.times { |i| writer << i } - # end - # - # Notes: - # - By default all data read from the file/stream is converted into UTF-8 before being persisted. This - # is recommended since Mongo only supports UTF-8 strings. - # - When zip format, the Zip file/stream must contain only one file, the first file found will be - # loaded into the job - # - If an io stream is supplied, it is read until it returns nil. - # - Only use this method for UTF-8 data, for binary data use #input_slice or #input_records. - # - Only call from one thread at a time per job instance. - # - CSV parsing is slow, so it is left for the workers to do. def upload(file_name_or_io = nil, encoding: 'UTF-8', stream_mode: :line, on_first: nil, **args, &block) raise(ArgumentError, 'Either file_name_or_io, or a block must be supplied') unless file_name_or_io || block block ||= -> (io) do iterator = "each_#{stream_mode}".to_sym @@ -108,39 +10,10 @@ end Writer::Input.collect(self, on_first: on_first, &block) end - # Upload the result of a MongoDB query to the input collection for processing - # Useful when an entire MongoDB collection, or part thereof needs to be - # processed by a job. - # - # Returns [Integer] the number of records uploaded - # - # If a Block is supplied it is passed the document returned from the - # database and should return a record for processing - # - # If no Block is supplied then the record will be the :fields returned - # from MongoDB - # - # Note: - # This method uses the collection and not the MongoMapper document to - # avoid the overhead of constructing a Model with every document returned - # by the query - # - # Note: - # The Block must return types that can be serialized to BSON. - # Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time - # Invalid: Date, etc. - # - # Example: Upload document ids - # criteria = User.where(state: 'FL') - # job.record_count = job.upload_mongo_query(criteria) - # - # Example: Upload just the supplied column - # criteria = User.where(state: 'FL') - # job.record_count = job.upload_mongo_query(criteria, :zip_code) def upload_mongo_query(criteria, *column_names, &block) options = criteria.options # Without a block extract the fields from the supplied criteria if block @@ -169,34 +42,10 @@ records << block.call(document) end end end - # Upload results from an Arel into RocketJob::SlicedJob. - # - # Params - # column_names - # When a block is not supplied, supply the names of the columns to be returned - # and uploaded into the job - # These columns are automatically added to the select list to reduce overhead - # - # If a Block is supplied it is passed the model returned from the database and should - # return the work item to be uploaded into the job. - # - # Returns [Integer] the number of records uploaded - # - # Example: Upload id's for all users - # arel = User.all - # job.record_count = job.upload_arel(arel) - # - # Example: Upload selected user id's - # arel = User.where(country_code: 'US') - # job.record_count = job.upload_arel(arel) - # - # Example: Upload user_name and zip_code - # arel = User.where(country_code: 'US') - # job.record_count = job.upload_arel(arel, :user_name, :zip_code) def upload_arel(arel, *column_names, &block) unless block column_names = column_names.collect(&:to_sym) column_names << :id if column_names.size.zero? @@ -215,25 +64,10 @@ Writer::Input.collect(self) do |records| arel.find_each { |model| records << block.call(model) } end end - # Upload sliced range of integer requests as a an arrays of start and end ids - # - # Returns [Integer] the number of slices uploaded - # - # Uploads one range per slice so that the response can return multiple records - # for each slice processed - # - # Example - # job.slice_size = 100 - # job.record_count = job.upload_integer_range(200, 421) - # - # # Equivalent to calling: - # job.record_count = job.insert([200,299]) - # job.record_count += job.insert([300,399]) - # job.record_count += job.insert([400,421]) def upload_integer_range(start_id, last_id) create_indexes count = 0 while start_id <= last_id end_id = start_id + slice_size - 1 @@ -243,29 +77,10 @@ count += 1 end count end - # Upload sliced range of integer requests as an arrays of start and end ids - # starting with the last range first - # - # Returns [Integer] the number of slices uploaded - # - # Uploads one range per slice so that the response can return multiple records - # for each slice processed. - # Useful for when the highest order integer values should be processed before - # the lower integer value ranges. For example when processing every record - # in a database based on the id column - # - # Example - # job.slice_size = 100 - # job.record_count = job.upload_integer_range_in_reverse_order(200, 421) * job.slice_size - # - # # Equivalent to calling: - # job.insert([400,421]) - # job.insert([300,399]) - # job.insert([200,299]) def upload_integer_range_in_reverse_order(start_id, last_id) create_indexes end_id = last_id count = 0 while end_id >= start_id @@ -288,12 +103,11 @@ # ap slice # end # def each_failed_record failed.each do |slice| - if slice.exception && (record_number = slice.exception.record_number) - yield(slice.at(record_number - 1), slice) - end + record = slice.failed_record + yield(record, slice) unless record.nil? end end # Requeue all failed slices def requeue_failed