lib/filestack/utils/multipart_upload_utils.rb in filestack-2.1.0 vs lib/filestack/utils/multipart_upload_utils.rb in filestack-2.2.0

- old
+ new

@@ -2,22 +2,26 @@ require 'digest' require 'mimemagic' require 'json' require 'parallel' require 'unirest' - +require 'progress_bar' require 'filestack/config' require 'filestack/utils/utils' include UploadUtils - +include IntelligentUtils +Unirest.timeout(30) # Includes all the utility functions for Filestack multipart uploads module MultipartUploadUtils def get_file_info(file) filename = File.basename(file) filesize = File.size(file) mimetype = MimeMagic.by_magic(File.open(file)) + if mimetype.nil? + mimetype = 'application/octet-stream' + end [filename, filesize, mimetype.to_s] end # Send start response to multipart endpoint # @@ -36,13 +40,14 @@ apikey: apikey, filename: filename, mimetype: mimetype, size: filesize, store_location: options.nil? ? 's3' : options[:store_location], - file: Tempfile.new(filename) + file: Tempfile.new(filename), + options: options, + 'multipart' => 'true' } - params = params.merge!(options) if options unless security.nil? params[:policy] = security.policy params[:signature] = security.signature @@ -53,11 +58,11 @@ headers: FilestackConfig::HEADERS ) if response.code == 200 response.body else - raise Exception(response.body) + raise RuntimeError.new(response.body) end end # Create array of jobs for parallel uploading # @@ -65,39 +70,48 @@ # @param [String] filename Name of incoming file # @param [String] filepath Local path to file # @param [Int] filesize Size of incoming file # @param [Unirest::Response] start_response Response body from # multipart_start - # @param [FilestackSecurity] security Security object with - # policy/signature # @param [Hash] options User-defined options for # multipart uploads # # @return [Array] def create_upload_jobs(apikey, filename, filepath, filesize, start_response, options) jobs = [] part = 1 seek_point = 0 while seek_point < filesize - jobs.push( + part_info = { seek: seek_point, filepath: filepath, filename: filename, apikey: apikey, part: part, + filesize: filesize, uri: start_response['uri'], region: start_response['region'], upload_id: start_response['upload_id'], location_url: start_response['location_url'], + start_response: start_response, + options: options, store_location: options.nil? ? 's3' : options[:store_location] - ) + } + if seek_point + FilestackConfig::DEFAULT_CHUNK_SIZE > filesize + size = filesize - (seek_point) + else + size = FilestackConfig::DEFAULT_CHUNK_SIZE + end + part_info[:size] = size + jobs.push(part_info) part += 1 seek_point += FilestackConfig::DEFAULT_CHUNK_SIZE end jobs end + # Uploads one chunk of the file # # @param [Hash] job Hash of options needed # to upload a chunk # @param [String] apikey Filestack API key @@ -110,21 +124,22 @@ # @return [Unirest::Response] def upload_chunk(job, apikey, filepath, options) file = File.open(filepath) file.seek(job[:seek]) chunk = file.read(FilestackConfig::DEFAULT_CHUNK_SIZE) + md5 = Digest::MD5.new md5 << chunk data = { apikey: apikey, part: job[:part], size: chunk.length, md5: md5.base64digest, uri: job[:uri], region: job[:region], upload_id: job[:upload_id], - store_location: options.nil? ? 's3' : options[:store_location], + store_location: job[:store_location], file: Tempfile.new(job[:filename]) } data = data.merge!(options) if options fs_response = Unirest.post( FilestackConfig::MULTIPART_UPLOAD_URL, parameters: data, @@ -132,32 +147,34 @@ ).body Unirest.put( fs_response['url'], headers: fs_response['headers'], parameters: chunk ) end - # Runs all jobs in parallel # # @param [Array] jobs Array of jobs to be run # @param [String] apikey Filestack API key # @param [String] filepath Local path to file # @param [Hash] options User-defined options for # multipart uploads # # @return [Array] Array of parts/etags strings def run_uploads(jobs, apikey, filepath, options) - results = Parallel.map(jobs) do |job| + bar = ProgressBar.new(jobs.length) + results = Parallel.map(jobs, in_threads: 4) do |job| response = upload_chunk( job, apikey, filepath, options ) - part = job[:part] - etag = response.headers[:etag] - "#{part}:#{etag}" + if response.code == 200 + bar.increment! + part = job[:part] + etag = response.headers[:etag] + "#{part}:#{etag}" + end end results end - # Send complete call to multipart endpoint # # @param [String] apikey Filestack API key # @param [String] filename Name of incoming file # @param [Int] filesize Size of incoming file @@ -171,24 +188,38 @@ # part numbers # @param [Hash] options User-defined options for # multipart uploads # # @return [Unirest::Response] - def multipart_complete(apikey, filename, filesize, mimetype, start_response, parts_and_etags, options) - data = { - apikey: apikey, - uri: start_response['uri'], - region: start_response['region'], - upload_id: start_response['upload_id'], - filename: filename, - size: filesize, - mimetype: mimetype, - parts: parts_and_etags.join(';'), - store_location: options.nil? ? 's3' : options[:store_location], - file: Tempfile.new(filename) - } - + def multipart_complete(apikey, filename, filesize, mimetype, start_response, parts_and_etags, options, intelligent = false) + if !intelligent + data = { + apikey: apikey, + uri: start_response['uri'], + region: start_response['region'], + upload_id: start_response['upload_id'], + filename: filename, + size: filesize, + mimetype: mimetype, + parts: parts_and_etags.join(';'), + store_location: options.nil? ? 's3' : options[:store_location], + file: Tempfile.new(filename) + } + else + data = { + apikey: apikey, + uri: start_response['uri'], + region: start_response['region'], + upload_id: start_response['upload_id'], + filename: filename, + size: filesize, + mimetype: mimetype, + store_location: options.nil? ? 's3' : options[:store_location], + file: Tempfile.new(filename), + 'multipart' => 'true' + } + end data = data.merge!(options) if options Unirest.post( FilestackConfig::MULTIPART_COMPLETE_URL, parameters: data, headers: FilestackConfig::HEADERS @@ -203,21 +234,45 @@ # policy/signature # @param [Hash] options User-defined options for # multipart uploads # # @return [Unirest::Response] - def multipart_upload(apikey, filepath, security, options) + def multipart_upload(apikey, filepath, security, options, timeout, intelligent: false) filename, filesize, mimetype = get_file_info(filepath) start_response = multipart_start( apikey, filename, filesize, mimetype, security, options ) + unless start_response['upload_type'].nil? + intelligent_enabled = ((start_response['upload_type'].include? 'intelligent_ingestion')) && intelligent + end jobs = create_upload_jobs( apikey, filename, filepath, filesize, start_response, options ) - parts_and_etags = run_uploads(jobs, apikey, filepath, options) - response_complete = multipart_complete( - apikey, filename, filesize, mimetype, - start_response, parts_and_etags, options - ) + if intelligent_enabled + state = IntelligentState.new + run_intelligent_upload_flow(jobs, state) + response_complete = multipart_complete( + apikey, filename, filesize, mimetype, + start_response, nil, options, intelligent + ) + else + parts_and_etags = run_uploads(jobs, apikey, filepath, options) + response_complete = multipart_complete( + apikey, filename, filesize, mimetype, + start_response, parts_and_etags, options + ) + end + begin + Timeout::timeout(timeout){ + while response_complete.code == 202 + response_complete = multipart_complete( + apikey, filename, filesize, mimetype, + start_response, nil, options, intelligent + ) + end + } + rescue + raise "Upload timed out upon completion. Please try again later" + end response_complete.body end end