lib/rocket_job/batch/io.rb in rocketjob-4.1.1 vs lib/rocket_job/batch/io.rb in rocketjob-4.2.0

- old
+ new

@@ -37,45 +37,271 @@ collection_name << ".#{category}" unless category == :main (@outputs ||= {})[category] ||= RocketJob::Sliced::Output.new(slice_arguments(collection_name)) end - # Upload the supplied file_name or stream + # Upload the supplied file_name or stream. # - # Updates the record_count after adding the records + # Returns [Integer] the number of records uploaded. # - # Options - # :file_name [String] - # When file_name_or_io is an IO, the original base file name if any. - # Default: nil + # Parameters + # file_name_or_io [String | IO] + # Full path and file name to stream into the job, + # Or, an IO Stream that responds to: :read # - # See RocketJob::Sliced::Input#upload for remaining options + # streams [Symbol|Array] + # Streams to convert the data whilst it is being read. + # When nil, the file_name extensions will be inspected to determine what + # streams should be applied. + # Default: nil # - # Returns [Integer] the number of records uploaded + # delimiter[String] + # Line / Record delimiter to use to break the stream up into records + # Any string to break the stream up by + # The records when saved will not include this delimiter + # Default: nil + # Automatically detect line endings and break up by line + # Searches for the first "\r\n" or "\n" and then uses that as the + # delimiter for all subsequent records # - # Note: - # * Not thread-safe. Only call from one thread at a time + # buffer_size [Integer] + # Size of the blocks when reading from the input file / stream. + # Default: 65536 ( 64K ) + # + # encoding: [String|Encoding] + # Encode returned data with this encoding. + # 'US-ASCII': Original 7 bit ASCII Format + # 'ASCII-8BIT': 8-bit ASCII Format + # 'UTF-8': UTF-8 Format + # Etc. + # Default: 'UTF-8' + # + # encode_replace: [String] + # The character to replace with when a character cannot be converted to the target encoding. + # nil: Don't replace any invalid characters. Encoding::UndefinedConversionError is raised. + # Default: nil + # + # encode_cleaner: [nil|symbol|Proc] + # Cleanse data read from the input stream. + # nil: No cleansing + # :printable Cleanse all non-printable characters except \r and \n + # Proc/lambda Proc to call after every read to cleanse the data + # Default: :printable + # + # stream_mode: [:line | :row | :record] + # :line + # Uploads the file a line (String) at a time for processing by workers. + # :row + # Parses each line from the file as an Array and uploads each array for processing by workers. + # :record + # Parses each line from the file into a Hash and uploads each hash for processing by workers. + # See IOStream#each_line, IOStream#each_row, and IOStream#each_record. + # + # Example: + # # Load plain text records from a file + # job.input.upload('hello.csv') + # + # Example: + # # Load plain text records from a file, stripping all non-printable characters, + # # as well as any characters that cannot be converted to UTF-8 + # job.input.upload('hello.csv', encode_cleaner: :printable, encode_replace: '') + # + # Example: Zip + # # Since csv is not known to RocketJob it is ignored + # job.input.upload('myfile.csv.zip') + # + # Example: Encrypted Zip + # job.input.upload('myfile.csv.zip.enc') + # + # Example: Explicitly set the streams + # job.input.upload('myfile.ze', streams: [:zip, :enc]) + # + # Example: Supply custom options + # job.input.upload('myfile.csv.enc', streams: :enc]) + # + # Example: Extract streams from filename but write to a temp file + # streams = IOStreams.streams_for_file_name('myfile.gz.enc') + # t = Tempfile.new('my_project') + # job.input.upload(t.to_path, streams: streams) + # + # Example: Upload by writing records one at a time to the upload stream + # job.upload do |writer| + # 10.times { |i| writer << i } + # end + # + # Notes: + # * Only call from one thread at a time against a single instance of this job. + # * The record_count for the job is set to the number of records returned by the arel. + # * If an exception is raised while uploading data, the input collection is cleared out + # so that if a job is retried during an upload failure, data is not duplicated. + # * By default all data read from the file/stream is converted into UTF-8 before being persisted. This + # is recommended since Mongo only supports UTF-8 strings. + # * When zip format, the Zip file/stream must contain only one file, the first file found will be + # loaded into the job + # * If an io stream is supplied, it is read until it returns nil. + # * Only use this method for UTF-8 data, for binary data use #input_slice or #input_records. + # * CSV parsing is slow, so it is usually left for the workers to do. def upload(file_name_or_io = nil, file_name: nil, category: :main, **args, &block) if file_name self.upload_file_name = file_name elsif file_name_or_io.is_a?(String) self.upload_file_name = file_name_or_io end count = input(category).upload(file_name_or_io, file_name: file_name, **args, &block) self.record_count = (record_count || 0) + count count + rescue StandardError => exc + input(category).delete_all + raise(exc) end + # Upload results from an Arel into RocketJob::SlicedJob. + # + # Params + # column_names + # When a block is not supplied, supply the names of the columns to be returned + # and uploaded into the job + # These columns are automatically added to the select list to reduce overhead + # + # If a Block is supplied it is passed the model returned from the database and should + # return the work item to be uploaded into the job. + # + # Returns [Integer] the number of records uploaded + # + # Example: Upload id's for all users + # arel = User.all + # job.upload_arel(arel) + # + # Example: Upload selected user id's + # arel = User.where(country_code: 'US') + # job.upload_arel(arel) + # + # Example: Upload user_name and zip_code + # arel = User.where(country_code: 'US') + # job.upload_arel(arel, :user_name, :zip_code) + # + # Notes: + # * Only call from one thread at a time against a single instance of this job. + # * The record_count for the job is set to the number of records returned by the arel. + # * If an exception is raised while uploading data, the input collection is cleared out + # so that if a job is retried during an upload failure, data is not duplicated. def upload_arel(arel, *column_names, category: :main, &block) count = input(category).upload_arel(arel, *column_names, &block) self.record_count = (record_count || 0) + count count + rescue StandardError => exc + input(category).delete_all + raise(exc) end + # Upload the result of a MongoDB query to the input collection for processing + # Useful when an entire MongoDB collection, or part thereof needs to be + # processed by a job. + # + # Returns [Integer] the number of records uploaded + # + # If a Block is supplied it is passed the document returned from the + # database and should return a record for processing + # + # If no Block is supplied then the record will be the :fields returned + # from MongoDB + # + # Note: + # This method uses the collection and not the MongoMapper document to + # avoid the overhead of constructing a Model with every document returned + # by the query + # + # Note: + # The Block must return types that can be serialized to BSON. + # Valid Types: Hash | Array | String | Integer | Float | Symbol | Regexp | Time + # Invalid: Date, etc. + # + # Example: Upload document ids + # criteria = User.where(state: 'FL') + # job.record_count = job.upload_mongo_query(criteria) + # + # Example: Upload just the supplied column + # criteria = User.where(state: 'FL') + # job.record_count = job.upload_mongo_query(criteria, :zip_code) + # + # Notes: + # * Only call from one thread at a time against a single instance of this job. + # * The record_count for the job is set to the number of records returned by the monqo query. + # * If an exception is raised while uploading data, the input collection is cleared out + # so that if a job is retried during an upload failure, data is not duplicated. def upload_mongo_query(criteria, *column_names, category: :main, &block) count = input(category).upload_mongo_query(criteria, *column_names, &block) self.record_count = (record_count || 0) + count count + rescue StandardError => exc + input(category).delete_all + raise(exc) + end + + # Upload sliced range of integer requests as arrays of start and end ids. + # + # Returns [Integer] last_id - start_id + 1. + # + # Uploads one range per slice so that the response can return multiple records + # for each slice processed + # + # Example + # job.slice_size = 100 + # job.upload_integer_range(200, 421) + # + # # Equivalent to calling: + # job.input.insert([200,299]) + # job.input.insert([300,399]) + # job.input.insert([400,421]) + # + # Notes: + # * Only call from one thread at a time against a single instance of this job. + # * The record_count for the job is set to: last_id - start_id + 1. + # * If an exception is raised while uploading data, the input collection is cleared out + # so that if a job is retried during an upload failure, data is not duplicated. + def upload_integer_range(start_id, last_id, category: :main) + input(category).upload_integer_range(start_id, last_id) + count = last_id - start_id + 1 + self.record_count = (record_count || 0) + count + count + rescue StandardError => exc + input(category).delete_all + raise(exc) + end + + # Upload sliced range of integer requests as an arrays of start and end ids + # starting with the last range first + # + # Returns [Integer] last_id - start_id + 1. + # + # Uploads one range per slice so that the response can return multiple records + # for each slice processed. + # Useful for when the highest order integer values should be processed before + # the lower integer value ranges. For example when processing every record + # in a database based on the id column + # + # Example + # job.slice_size = 100 + # job.upload_integer_range_in_reverse_order(200, 421) + # + # # Equivalent to calling: + # job.input.insert([400,421]) + # job.input.insert([300,399]) + # job.input.insert([200,299]) + # + # Notes: + # * Only call from one thread at a time against a single instance of this job. + # * The record_count for the job is set to: last_id - start_id + 1. + # * If an exception is raised while uploading data, the input collection is cleared out + # so that if a job is retried during an upload failure, data is not duplicated. + def upload_integer_range_in_reverse_order(start_id, last_id, category: :main) + input(category).upload_integer_range_in_reverse_order(start_id, last_id) + count = last_id - start_id + 1 + self.record_count = (record_count || 0) + count + count + rescue StandardError => exc + input(category).delete_all + raise(exc) end # Upload the supplied slices for processing by workers # # Updates the record_count after adding the records