lib/filestack/utils/utils.rb in filestack-2.6.7 vs lib/filestack/utils/utils.rb in filestack-2.7.0

- old
+ new

@@ -60,40 +60,46 @@ Typhoeus.public_send( action, url, params: parameters, headers: headers ) end + def build_store_task(options = {}) + return 'store' if options.empty? + tasks = [] + options.each do |key, value| + value = case key + when :workflows + [value.join('","')] + when :path + "\"#{value}\"" + else + value + end + tasks.push("#{key}:#{value.to_s.downcase}") + end + "store=#{tasks.join(',')}" + end + # Uploads to v1 REST API (for external URLs or if multipart is turned off) # # @param [String] apikey Filestack API key - # @param [String] filepath Local path to file # @param [String] external_url External URL to be uploaded # @param [FilestackSecurity] security Security object with # policy/signature # @param [Hash] options User-defined options for # multipart uploads - # @param [String] storage Storage destination - # (s3, rackspace, etc) # @return [Hash] - def send_upload(apikey, filepath: nil, external_url: nil, security: nil, options: nil, storage: 'S3') - data = if filepath - { fileUpload: File.open(filepath) } - else - { url: external_url } - end + def send_upload(apikey, external_url: nil, security: nil, options: nil) + base = "#{FilestackConfig::CDN_URL}/#{apikey}/#{build_store_task(options)}" - # adds any user-defined upload options to request payload - data = data.merge!(options) unless options.nil? - base = "#{FilestackConfig::API_URL}/store/#{storage}?key=#{apikey}" - if security policy = security.policy signature = security.signature - base = "#{base}&signature=#{signature}&policy=#{policy}" + base = "#{base}/security=s:#{signature},p:#{policy}" end - response = Typhoeus.post(base, body: data, headers: FilestackConfig::HEADERS) + response = Typhoeus.post("#{base}/#{external_url}", headers: FilestackConfig::HEADERS) if response.code == 200 response_body = JSON.parse(response.body) handle = response_body['url'].split('/').last return { 'handle' => handle } @@ -191,19 +197,19 @@ # # @param [Array] jobs A list of file parts # @param [IntelligentState] state An IntelligentState object # # @return [Array] - def run_intelligent_upload_flow(jobs, state) + def run_intelligent_upload_flow(jobs, state, storage) bar = ProgressBar.new(jobs.length) generator = create_intelligent_generator(jobs) working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE while generator.alive? batch = get_generator_batch(generator) # run parts Parallel.map(batch, in_threads: 4) do |part| - state = run_intelligent_uploads(part, state) + state = run_intelligent_uploads(part, state, storage) # condition: a chunk has failed but we have not reached the maximum retries while bad_state(state) # condition: timeout to S3, requiring offset size to be changed if state.error_type == 'S3_NETWORK' sleep(5) @@ -211,11 +217,11 @@ # condition: timeout to backend, requiring only backoff elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type sleep(state.backoff) end state.add_retry - state = run_intelligent_uploads(part, state) + state = run_intelligent_uploads(part, state, storage) end raise "Upload has failed. Please try again later." unless state.ok bar.increment! end end @@ -267,27 +273,28 @@ # @param [Int] filesize Size of incoming file # @param [Typhoeus::Response] start_response Response body from # multipart_start # # @return [Dict] - def chunk_job(job, state, apikey, filename, filepath, filesize, start_response) + def chunk_job(job, state, apikey, filename, filepath, filesize, start_response, storage) offset = 0 - seek_point = job[:seek] + seek_point = job[:seek_point] chunk_list = [] + while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize chunk_list.push( - seek: seek_point, + seek_point: seek_point, filepath: filepath, filename: filename, apikey: apikey, part: job[:part], size: job[:size], uri: start_response['uri'], region: start_response['region'], upload_id: start_response['upload_id'], location_url: start_response['location_url'], - store_location: job[:store_location], + store: { location: storage }, offset: offset ) offset += state.offset end chunk_list @@ -298,19 +305,19 @@ # @param [Dict] part A dictionary representing the information # for a single part # @param [IntelligentState] state An IntelligentState object # # @return [IntelligentState] - def run_intelligent_uploads(part, state) + def run_intelligent_uploads(part, state, storage) failed = false chunks = chunk_job( part, state, part[:apikey], part[:filename], part[:filepath], - part[:filesize], part[:start_response] + part[:filesize], part[:start_response], storage ) Parallel.map(chunks, in_threads: 3) do |chunk| begin - upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options]) + upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options], storage) rescue => e state.error_type = e.message failed = true Parallel::Kill end @@ -320,23 +327,26 @@ state.ok = false return state else state.ok = true end + commit_params = { apikey: part[:apikey], uri: part[:uri], region: part[:region], upload_id: part[:upload_id], size: part[:filesize], part: part[:part], - location_url: part[:location_url], - store_location: part[:store_location], - file: Tempfile.new(part[:filename]) + location_url: part[:start_response]['location_url'], + store: { location: storage } } - response = Typhoeus.post(FilestackConfig::MULTIPART_COMMIT_URL, body: commit_params, - headers: FilestackConfig::HEADERS) + + response = Typhoeus.post(FilestackConfig.multipart_commit_url(commit_params[:location_url]), + body: commit_params.to_json, + headers: FilestackConfig::HEADERS) + if response.code == 200 state.reset else state.ok = false end @@ -352,13 +362,14 @@ # @param [String] filepath Local path to the file # @param [Hash] options User-defined options for # multipart uploads # # @return [Typhoeus::Response] - def upload_chunk_intelligently(job, state, apikey, filepath, options) + def upload_chunk_intelligently(job, state, apikey, filepath, options, storage) file = File.open(filepath) - file.seek(job[:seek] + job[:offset]) + file.seek(job[:seek_point] + job[:offset]) + chunk = file.read(state.offset) md5 = Digest::MD5.new md5 << chunk data = { apikey: apikey, @@ -366,20 +377,20 @@ size: chunk.length, md5: md5.base64digest, uri: job[:uri], region: job[:region], upload_id: job[:upload_id], - store_location: job[:store_location], + store: { location: storage }, offset: job[:offset], - file: Tempfile.new(job[:filename]), - 'multipart' => 'true' + fii: true } data = data.merge!(options) if options - fs_response = Typhoeus.post( - FilestackConfig::MULTIPART_UPLOAD_URL, body: data, - headers: FilestackConfig::HEADERS - ) + + fs_response = Typhoeus.post(FilestackConfig.multipart_upload_url(job[:location_url]), + body: data.to_json, + headers: FilestackConfig::HEADERS) + # POST to multipart/upload begin unless fs_response.code == 200 if [400, 403, 404].include? fs_response.code raise 'FAILURE'