lib/filestack/utils/utils.rb in filestack-2.1.0 vs lib/filestack/utils/utils.rb in filestack-2.2.0

- old
+ new

@@ -1,13 +1,51 @@ require 'base64' require 'digest' +require 'fiber' require 'mimemagic' require 'json' require 'unirest' require 'filestack/config' +# set timeout for all requests to be 30 seconds +Unirest.timeout(30) +class IntelligentState + attr_accessor :offset, :ok, :error_type + def initialize + @offset = 524288 + @ok = true + @alive = true + @retries = 0 + @backoff = 1 + @offset_index = 0 + @offset_sizes = [524288, 262144, 131072, 65536, 32768] + end + def alive? + @alive + end + + def add_retry + @retries += 1 + @alive = false if @retries >= 5 + end + + def backoff + @backoff = 2 ** retries + end + + def next_offset + current_offset = @offset_sizes[@offset_index] + @offset_index += 1 + return current_offset + end + + def reset + @retries = 0 + end +end + # Includes general utility functions for the Filestack Ruby SDK module UploadUtils # General request function # @param [String] url The URL being called # @param [String] action The specific HTTP action @@ -67,11 +105,11 @@ # @param [String] base The base Filestack URL # @param [String] handle The FilestackFilelink handle (optional) # @param [String] path The specific API path (optional) # @param [String] security Security for the FilestackFilelink (optional) # - # return [String] + # @return [String] def get_url(base, handle: nil, path: nil, security: nil) url_components = [base] url_components.push(path) unless path.nil? url_components.push(handle) unless handle.nil? @@ -89,10 +127,13 @@ # Utility functions for transformations module TransformUtils # Creates a transformation task to be sent back to transform object # + # @param [String] transform The task to be added + # @param [Dict] options A dictionary representing the options for that task + # # @return [String] def add_transform_task(transform, options = {}) options_list = [] if !options.empty? options.each do |key, array| @@ -101,7 +142,274 @@ options_string = options_list.join(',') "#{transform}=#{options_string}" else transform.to_s end + end +end + +module IntelligentUtils + # Generates a batch given a Fiber + # + # @param [Fiber] generator A living Fiber object + # + # @return [Array] + def get_generator_batch(generator) + batch = [] + 4.times do + batch.push(generator.resume) if generator.alive? + end + return batch + end + + # Check if state is in error state + # or has reached maximum retries + # + # @param [IntelligentState] state An IntelligentState object + # + # @return [Boolean] + def bad_state(state) + !state.ok && state.alive? + end + + # Return current working offest if state + # has not tried it. Otherwise, return the next + # offset of the state + # + # @param [Integer] working_offset The current offset + # @param [IntelligentState] state An IntelligentState object + # + # @return [Integer] + def change_offset(working_offset, state) + if state.offset > working_offset + working_offset + else + state.offset = state.next_offset + end + end + + # Runs the intelligent upload flow, from start to finish + # + # @param [Array] jobs A list of file parts + # @param [IntelligentState] state An IntelligentState object + # + # @return [Array] + def run_intelligent_upload_flow(jobs, state) + bar = ProgressBar.new(jobs.length) + generator = create_intelligent_generator(jobs) + working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE + while generator.alive? + batch = get_generator_batch(generator) + # run parts + Parallel.map(batch, in_threads: 4) do |part| + state = run_intelligent_uploads(part, state) + # condition: a chunk has failed but we have not reached the maximum retries + while bad_state(state) + # condition: timeout to S3, requiring offset size to be changed + if state.error_type == 'S3_NETWORK' + sleep(5) + state.offset = working_offset = change_offset(working_offset, state) + # condition: timeout to backend, requiring only backoff + elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type + sleep(state.backoff) + end + state.add_retry + state = run_intelligent_uploads(part, state) + end + raise "Upload has failed. Please try again later." unless state.ok + bar.increment! + end + end + end + + # Creates a generator of part jobs + # + # @param [Array] jobs A list of file parts + # + # @return [Fiber] + def create_intelligent_generator(jobs) + jobs_gen = jobs.lazy.each + Fiber.new do + (jobs.length-1).times do + Fiber.yield jobs_gen.next + end + jobs_gen.next + end + end + + # Loop and run chunks for each offset + # + # @param [Array] jobs A list of file parts + # @param [IntelligentState] state An IntelligentState object + # @param [String] apikey Filestack API key + # @param [String] filename Name of incoming file + # @param [String] filepath Local path to the file + # @param [Int] filesize Size of incoming file + # @param [Unirest::Response] start_response Response body from + # multipart_start + # + # @return [Array] + def create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response) + jobs.each { |job| + job[:chunks] = chunk_job( + job, state, apikey, filename, filepath, filesize, start_response + ) + } + jobs + end + + # Chunk a specific job into offests + # + # @param [Dict] job Dictionary with all job options + # @param [IntelligentState] state An IntelligentState object + # @param [String] apikey Filestack API key + # @param [String] filename Name of incoming file + # @param [String] filepath Local path to the file + # @param [Int] filesize Size of incoming file + # @param [Unirest::Response] start_response Response body from + # multipart_start + # + # @return [Dict] + def chunk_job(job, state, apikey, filename, filepath, filesize, start_response) + offset = 0 + seek_point = job[:seek] + chunk_list = [] + while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize + chunk_list.push( + seek: seek_point, + filepath: filepath, + filename: filename, + apikey: apikey, + part: job[:part], + size: job[:size], + uri: start_response['uri'], + region: start_response['region'], + upload_id: start_response['upload_id'], + location_url: start_response['location_url'], + store_location: job[:store_location], + offset: offset + ) + offset += state.offset + end + chunk_list + end + + # Send a job's chunks in parallel and commit + # + # @param [Dict] part A dictionary representing the information + # for a single part + # @param [IntelligentState] state An IntelligentState object + # + # @return [IntelligentState] + def run_intelligent_uploads(part, state) + failed = false + chunks = chunk_job( + part, state, part[:apikey], part[:filename], part[:filepath], + part[:filesize], part[:start_response] + ) + Parallel.map(chunks, in_threads: 3) do |chunk| + begin + upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options]) + rescue => e + state.error_type = e.message + failed = true + Parallel::Kill + end + end + + if failed + state.ok = false + return state + else + state.ok = true + end + commit_params = { + apikey: part[:apikey], + uri: part[:uri], + region: part[:region], + upload_id: part[:upload_id], + size: part[:filesize], + part: part[:part], + location_url: part[:location_url], + store_location: part[:store_location], + file: Tempfile.new(part[:filename]) + } + response = Unirest.post(FilestackConfig::MULTIPART_COMMIT_URL, parameters: commit_params, + headers: FilestackConfig::HEADERS) + if response.code == 200 + state.reset + else + state.ok = false + end + state + end + + # Upload a single chunk + # + # @param [Dict] job Dictionary with all job options + # @param [IntelligentState] state An IntelligentState object + # @param [String] apikey Filestack API key + # @param [String] filename Name of incoming file + # @param [String] filepath Local path to the file + # @param [Hash] options User-defined options for + # multipart uploads + # + # @return [Unirest::Response] + def upload_chunk_intelligently(job, state, apikey, filepath, options) + file = File.open(filepath) + file.seek(job[:seek] + job[:offset]) + chunk = file.read(state.offset) + md5 = Digest::MD5.new + md5 << chunk + data = { + apikey: apikey, + part: job[:part], + size: chunk.length, + md5: md5.base64digest, + uri: job[:uri], + region: job[:region], + upload_id: job[:upload_id], + store_location: job[:store_location], + offset: job[:offset], + file: Tempfile.new(job[:filename]), + 'multipart' => 'true' + } + + data = data.merge!(options) if options + fs_response = Unirest.post( + FilestackConfig::MULTIPART_UPLOAD_URL, parameters: data, + headers: FilestackConfig::HEADERS + ) + # POST to multipart/upload + begin + unless fs_response.code == 200 + if [400, 403, 404].include? fs_response.code + raise 'FAILURE' + else + raise 'BACKEND_SERVER' + end + end + + rescue + raise 'BACKEND_NETWORK' + end + fs_response = fs_response.body + + # PUT to S3 + begin + amazon_response = Unirest.put( + fs_response['url'], headers: fs_response['headers'], parameters: chunk + ) + unless amazon_response.code == 200 + if [400, 403, 404].include? amazon_response.code + raise 'FAILURE' + else + raise 'S3_SERVER' + end + end + + rescue + raise 'S3_NETWORK' + end + amazon_response end end