Module: IntelligentUtils

Defined in:
lib/filestack/utils/utils.rb

Instance Method Summary collapse

Instance Method Details

#bad_state(state) ⇒ Boolean

Check if state is in error state or has reached maximum retries

Parameters:

Returns:

  • (Boolean)


170
171
172
# File 'lib/filestack/utils/utils.rb', line 170

def bad_state(state)
  !state.ok && state.alive?
end

#change_offset(working_offset, state) ⇒ Integer

Return current working offest if state has not tried it. Otherwise, return the next offset of the state

Parameters:

  • working_offset (Integer)

    The current offset

  • state (IntelligentState)

    An IntelligentState object

Returns:

  • (Integer)


182
183
184
185
186
187
188
# File 'lib/filestack/utils/utils.rb', line 182

def change_offset(working_offset, state)
  if state.offset > working_offset
    working_offset
  else
    state.offset = state.next_offset
  end
end

#chunk_job(job, state, apikey, filename, filepath, filesize, start_response) ⇒ Dict

Chunk a specific job into offests

Parameters:

  • job (Dict)

    Dictionary with all job options

  • state (IntelligentState)

    An IntelligentState object

  • apikey (String)

    Filestack API key

  • filename (String)

    Name of incoming file

  • filepath (String)

    Local path to the file

  • filesize (Int)

    Size of incoming file

  • start_response (Unirest::Response)

    Response body from multipart_start

Returns:

  • (Dict)


272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/filestack/utils/utils.rb', line 272

def chunk_job(job, state, apikey, filename, filepath, filesize, start_response)
  offset = 0
  seek_point = job[:seek]
  chunk_list = []
  while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize
    chunk_list.push(
      seek: seek_point,
      filepath: filepath,
      filename: filename,
      apikey: apikey,
      part: job[:part],
      size: job[:size],
      uri: start_response['uri'],
      region: start_response['region'],
      upload_id: start_response['upload_id'],
      location_url: start_response['location_url'],
      store_location: job[:store_location],
      offset: offset
    )
    offset += state.offset
  end
  chunk_list
end

#create_intelligent_generator(jobs) ⇒ Fiber

Creates a generator of part jobs

Parameters:

  • jobs (Array)

    A list of file parts

Returns:

  • (Fiber)


229
230
231
232
233
234
235
236
237
# File 'lib/filestack/utils/utils.rb', line 229

def create_intelligent_generator(jobs)
  jobs_gen = jobs.lazy.each
  Fiber.new do
    (jobs.length-1).times do 
      Fiber.yield jobs_gen.next
    end
    jobs_gen.next
  end
end

#create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response) ⇒ Array

Loop and run chunks for each offset

Parameters:

  • jobs (Array)

    A list of file parts

  • state (IntelligentState)

    An IntelligentState object

  • apikey (String)

    Filestack API key

  • filename (String)

    Name of incoming file

  • filepath (String)

    Local path to the file

  • filesize (Int)

    Size of incoming file

  • start_response (Unirest::Response)

    Response body from multipart_start

Returns:

  • (Array)


251
252
253
254
255
256
257
258
# File 'lib/filestack/utils/utils.rb', line 251

def create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response)
  jobs.each { |job|
    job[:chunks] = chunk_job(
      job, state, apikey, filename, filepath, filesize, start_response
    )
  }
  jobs
end

#get_generator_batch(generator) ⇒ Array

Generates a batch given a Fiber

Parameters:

  • generator (Fiber)

    A living Fiber object

Returns:

  • (Array)


156
157
158
159
160
161
162
# File 'lib/filestack/utils/utils.rb', line 156

def get_generator_batch(generator)
  batch = []
  4.times do
    batch.push(generator.resume) if generator.alive?
  end
  return batch
end

#run_intelligent_upload_flow(jobs, state) ⇒ Array

Runs the intelligent upload flow, from start to finish

Parameters:

  • jobs (Array)

    A list of file parts

  • state (IntelligentState)

    An IntelligentState object

Returns:

  • (Array)


196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/filestack/utils/utils.rb', line 196

def run_intelligent_upload_flow(jobs, state)
  bar = ProgressBar.new(jobs.length)
  generator = create_intelligent_generator(jobs)
  working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE
  while generator.alive?
    batch = get_generator_batch(generator)   
    # run parts   
    Parallel.map(batch, in_threads: 4) do |part|
      state = run_intelligent_uploads(part, state)
      # condition: a chunk has failed but we have not reached the maximum retries
      while bad_state(state)
        # condition: timeout to S3, requiring offset size to be changed
        if state.error_type == 'S3_NETWORK'
          sleep(5)
          state.offset = working_offset = change_offset(working_offset, state)
        # condition: timeout to backend, requiring only backoff
        elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type
          sleep(state.backoff)
        end
        state.add_retry
        state = run_intelligent_uploads(part, state)
      end
      raise "Upload has failed. Please try again later." unless state.ok
      bar.increment!
    end
  end
end

#run_intelligent_uploads(part, state) ⇒ IntelligentState

Send a job's chunks in parallel and commit

Parameters:

  • part (Dict)

    A dictionary representing the information for a single part

  • state (IntelligentState)

    An IntelligentState object

Returns:



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/filestack/utils/utils.rb', line 303

def run_intelligent_uploads(part, state)
  failed = false
  chunks = chunk_job(
    part, state, part[:apikey], part[:filename], part[:filepath], 
    part[:filesize], part[:start_response]
  )
  Parallel.map(chunks, in_threads: 3) do |chunk|
    begin
      upload_chunk_intelligently(chunk, state, part[:apikey], part[:filepath], part[:options])
    rescue => e
      state.error_type = e.message
      failed = true
      Parallel::Kill
    end
  end

  if failed
    state.ok = false
    return state
  else
    state.ok = true
  end
  commit_params = {
    apikey: part[:apikey],
    uri: part[:uri],
    region: part[:region],
    upload_id: part[:upload_id],
    size: part[:filesize],
    part: part[:part],
    location_url: part[:location_url],
    store_location: part[:store_location],
    file: Tempfile.new(part[:filename])
  }
  response = Unirest.post(FilestackConfig::MULTIPART_COMMIT_URL, parameters: commit_params,
                                                                 headers: FilestackConfig::HEADERS)
  if response.code == 200
    state.reset
  else 
    state.ok = false
  end
  state
end

#upload_chunk_intelligently(job, state, apikey, filepath, options) ⇒ Unirest::Response

Upload a single chunk

Parameters:

  • job (Dict)

    Dictionary with all job options

  • state (IntelligentState)

    An IntelligentState object

  • apikey (String)

    Filestack API key

  • filename (String)

    Name of incoming file

  • filepath (String)

    Local path to the file

  • options (Hash)

    User-defined options for multipart uploads

Returns:

  • (Unirest::Response)


357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/filestack/utils/utils.rb', line 357

def upload_chunk_intelligently(job, state, apikey, filepath, options)
  file = File.open(filepath)
  file.seek(job[:seek] + job[:offset])
  chunk = file.read(state.offset)
  md5 = Digest::MD5.new
  md5 << chunk  
  data = {
    apikey: apikey,
    part: job[:part],
    size: chunk.length,
    md5: md5.base64digest,
    uri: job[:uri],
    region: job[:region],
    upload_id: job[:upload_id],
    store_location: job[:store_location],
    offset: job[:offset],
    file: Tempfile.new(job[:filename]),
    'multipart' => 'true'
  }

  data = data.merge!(options) if options
  fs_response = Unirest.post(
    FilestackConfig::MULTIPART_UPLOAD_URL, parameters: data,
                                           headers: FilestackConfig::HEADERS
  )
  # POST to multipart/upload
  begin 
    unless fs_response.code == 200
      if [400, 403, 404].include? fs_response.code
        raise 'FAILURE'
      else 
        raise 'BACKEND_SERVER'
      end
    end

  rescue
    raise 'BACKEND_NETWORK'
  end
  fs_response = fs_response.body
  
  # PUT to S3
  begin 
    amazon_response = Unirest.put(
      fs_response['url'], headers: fs_response['headers'], parameters: chunk
    )
    unless amazon_response.code == 200
      if [400, 403, 404].include? amazon_response.code
        raise 'FAILURE'
      else 
        raise 'S3_SERVER'
      end
    end
  
  rescue
    raise 'S3_NETWORK'
  end
  amazon_response
end