lib/filestack/utils/utils.rb in filestack-2.6.7 vs lib/filestack/utils/utils.rb in filestack-2.7.0
- old
+ new
@@ -60,40 +60,46 @@
Typhoeus.public_send(
action, url, params: parameters, headers: headers
)
end
+ def build_store_task(options = {})
+ return 'store' if options.empty?
+ tasks = []
+ options.each do |key, value|
+ value = case key
+ when :workflows
+ [value.join('","')]
+ when :path
+ "\"#{value}\""
+ else
+ value
+ end
+ tasks.push("#{key}:#{value.to_s.downcase}")
+ end
+ "store=#{tasks.join(',')}"
+ end
+
# Uploads to v1 REST API (for external URLs or if multipart is turned off)
#
# @param [String] apikey Filestack API key
- # @param [String] filepath Local path to file
# @param [String] external_url External URL to be uploaded
# @param [FilestackSecurity] security Security object with
# policy/signature
# @param [Hash] options User-defined options for
# multipart uploads
- # @param [String] storage Storage destination
- # (s3, rackspace, etc)
# @return [Hash]
- def send_upload(apikey, filepath: nil, external_url: nil, security: nil, options: nil, storage: 'S3')
- data = if filepath
- { fileUpload: File.open(filepath) }
- else
- { url: external_url }
- end
+ def send_upload(apikey, external_url: nil, security: nil, options: nil)
+ base = "#{FilestackConfig::CDN_URL}/#{apikey}/#{build_store_task(options)}"
- # adds any user-defined upload options to request payload
- data = data.merge!(options) unless options.nil?
- base = "#{FilestackConfig::API_URL}/store/#{storage}?key=#{apikey}"
-
if security
policy = security.policy
signature = security.signature
- base = "#{base}&signature=#{signature}&policy=#{policy}"
+ base = "#{base}/security=s:#{signature},p:#{policy}"
end
- response = Typhoeus.post(base, body: data, headers: FilestackConfig::HEADERS)
+ response = Typhoeus.post("#{base}/#{external_url}", headers: FilestackConfig::HEADERS)
if response.code == 200
response_body = JSON.parse(response.body)
handle = response_body['url'].split('/').last
return { 'handle' => handle }
@@ -191,19 +197,19 @@
#
# @param [Array] jobs A list of file parts
# @param [IntelligentState] state An IntelligentState object
#
# @return [Array]
- def run_intelligent_upload_flow(jobs, state)
+ def run_intelligent_upload_flow(jobs, state, storage)
bar = ProgressBar.new(jobs.length)
generator = create_intelligent_generator(jobs)
working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE
while generator.alive?
batch = get_generator_batch(generator)
# run parts
Parallel.map(batch, in_threads: 4) do |part|
- state = run_intelligent_uploads(part, state)
+ state = run_intelligent_uploads(part, state, storage)
# condition: a chunk has failed but we have not reached the maximum retries
while bad_state(state)
# condition: timeout to S3, requiring offset size to be changed
if state.error_type == 'S3_NETWORK'
sleep(5)
@@ -211,11 +217,11 @@
# condition: timeout to backend, requiring only backoff
elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type
sleep(state.backoff)
end
state.add_retry
- state = run_intelligent_uploads(part, state)
+ state = run_intelligent_uploads(part, state, storage)
end
raise "Upload has failed. Please try again later." unless state.ok
bar.increment!
end
end
@@ -267,27 +273,28 @@
# @param [Int] filesize Size of incoming file
# @param [Typhoeus::Response] start_response Response body from
# multipart_start
#
# @return [Dict]
- def chunk_job(job, state, apikey, filename, filepath, filesize, start_response)
+ def chunk_job(job, state, apikey, filename, filepath, filesize, start_response, storage)
offset = 0
- seek_point = job[:seek]
+ seek_point = job[:seek_point]
chunk_list = []
+
while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize
chunk_list.push(
- seek: seek_point,
+ seek_point: seek_point,
filepath: filepath,
filename: filename,
apikey: apikey,
part: job[:part],
size: job[:size],
uri: start_response['uri'],
region: start_response['region'],
upload_id: start_response['upload_id'],
location_url: start_response['location_url'],
- store_location: job[:store_location],
+ store: { location: storage },
offset: offset
)
offset += state.offset
end
chunk_list
@@ -298,19 +305,19 @@
# @param [Dict] part A dictionary representing the information
# for a single part
# @param [IntelligentState] state An IntelligentState object
#
# @return [IntelligentState]
- def run_intelligent_uploads(part, state)
+ def run_intelligent_uploads(part, state, storage)
failed = false
chunks = chunk_job(
part, state, part[:apikey], part[:filename], part[:filepath],
- part[:filesize], part[:start_response]
+ part[:filesize], part[:start_response], storage
)
Parallel.map(chunks, in_threads: 3) do |chunk|
begin
- upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options])
+ upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options], storage)
rescue => e
state.error_type = e.message
failed = true
Parallel::Kill
end
@@ -320,23 +327,26 @@
state.ok = false
return state
else
state.ok = true
end
+
commit_params = {
apikey: part[:apikey],
uri: part[:uri],
region: part[:region],
upload_id: part[:upload_id],
size: part[:filesize],
part: part[:part],
- location_url: part[:location_url],
- store_location: part[:store_location],
- file: Tempfile.new(part[:filename])
+ location_url: part[:start_response]['location_url'],
+ store: { location: storage }
}
- response = Typhoeus.post(FilestackConfig::MULTIPART_COMMIT_URL, body: commit_params,
- headers: FilestackConfig::HEADERS)
+
+ response = Typhoeus.post(FilestackConfig.multipart_commit_url(commit_params[:location_url]),
+ body: commit_params.to_json,
+ headers: FilestackConfig::HEADERS)
+
if response.code == 200
state.reset
else
state.ok = false
end
@@ -352,13 +362,14 @@
# @param [String] filepath Local path to the file
# @param [Hash] options User-defined options for
# multipart uploads
#
# @return [Typhoeus::Response]
- def upload_chunk_intelligently(job, state, apikey, filepath, options)
+ def upload_chunk_intelligently(job, state, apikey, filepath, options, storage)
file = File.open(filepath)
- file.seek(job[:seek] + job[:offset])
+ file.seek(job[:seek_point] + job[:offset])
+
chunk = file.read(state.offset)
md5 = Digest::MD5.new
md5 << chunk
data = {
apikey: apikey,
@@ -366,20 +377,20 @@
size: chunk.length,
md5: md5.base64digest,
uri: job[:uri],
region: job[:region],
upload_id: job[:upload_id],
- store_location: job[:store_location],
+ store: { location: storage },
offset: job[:offset],
- file: Tempfile.new(job[:filename]),
- 'multipart' => 'true'
+ fii: true
}
data = data.merge!(options) if options
- fs_response = Typhoeus.post(
- FilestackConfig::MULTIPART_UPLOAD_URL, body: data,
- headers: FilestackConfig::HEADERS
- )
+
+ fs_response = Typhoeus.post(FilestackConfig.multipart_upload_url(job[:location_url]),
+ body: data.to_json,
+ headers: FilestackConfig::HEADERS)
+
# POST to multipart/upload
begin
unless fs_response.code == 200
if [400, 403, 404].include? fs_response.code
raise 'FAILURE'