Sha256: 208a14086d22069de6d2c8a0382eda3407034d2b7a1052a00e13b159b350c22b

Contents?: true

Size: 1.38 KB

Versions: 1

Compression:

Stored size: 1.38 KB

Contents

class Fanforce::Worker

  def self.run(worker_data, &code_block)
    self.new(worker_data, &code_block)
  end

  def initialize(worker_data, &code_block)
    require_dependencies
    run_validations

    @worker_data = worker_data
    @code_block = code_block

    run_worker iron_mq.queue(worker_data['worker_id']).get(timeout: 3600)
  end

  def require_dependencies
    require '.pluginenv'
    require 'fanforce/api'
    require 'iron_mq'
  end

  def run_validations

  end

  def iron_mq
    @iron_mq ||= IronMQ::Client.new
  end

  def run_worker(msg)
    puts '----------------------------------------------------------'
    print 'PROCESSING MESSAGE: '
    task_data = Fanforce.decode_json(msg.body)

    set_env_vars(@worker_data['env_vars'])
    @code_block.call(task_data[:params].clone, task_data[:retries].clone)
    msg.delete

  rescue Exception => e
    if msg.nil?
      puts 'MESSAGE IS NIL'
      return
    end

    task_data[:last_error] = {
        exception: e.class.name,
        message: e.message,
        backtrace: e.backtrace,
        raised_at: Time.now
    }
    task_data[:env_vars] = @worker_data['env_vars']
    print 'ADDING TO ERROR QUEUE: '
    puts task_data.to_json
    Fanforce::Workers.iron.queue "#{@worker_data['worker_id']}-ERRORS", task_data
    msg.delete
    puts 'DELETED MESSAGE'
  end

  def set_env_vars(vars)
    vars.each {|k,v| ENV[k.to_s]=v }
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fanforce-workers-0.3.4 lib/fanforce/workers/worker.rb