Sha256: 48ffa2918978f1d54e763e72e394940d6598ad3d16bafe97de017b3d80571122

Contents?: true

Size: 1.37 KB

Versions: 6

Compression:

Stored size: 1.37 KB

Contents

class Fanforce::Worker

  def self.run(worker_data, &code_block)
    self.new(worker_data, &code_block)
  end

  def initialize(worker_data, &code_block)
    require_dependencies
    run_validations

    @worker_data = worker_data
    @code_block = code_block

    run_worker iron_mq.queue(worker_data['worker_id']).get(timeout: 3600)
  end

  def require_dependencies
    require '.pluginenv'
    require 'fanforce/api'
    require 'iron_mq'
  end

  def run_validations

  end

  def iron_mq
    @iron_mq ||= IronMQ::Client.new
  end

  def run_worker(msg)
    puts '----------------------------------------------------------'
    print 'PROCESSING MESSAGE: '
    task_data = Fanforce.decode_json(msg.body)

    set_env_vars(@worker_data['env_vars'])
    @code_block.call(task_data[:params].clone, task_data[:retries])
    msg.delete

  rescue Exception => e
    if msg.nil?
      puts 'MESSAGE IS NIL'
      return
    end

    task_data[:last_error] = {
        exception: e.class.name,
        message: e.message,
        backtrace: e.backtrace,
        raised_at: Time.now
    }
    task_data[:env_vars] = @worker_data['env_vars']
    print 'ADDING TO ERROR QUEUE: '
    puts task_data.to_json
    Fanforce::Workers.iron.queue "#{@worker_data['worker_id']}-ERRORS", task_data
    msg.delete
    puts 'DELETED MESSAGE'
  end

  def set_env_vars(vars)
    vars.each {|k,v| ENV[k.to_s]=v }
  end

end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
fanforce-workers-0.4.4 lib/fanforce/workers/worker.rb
fanforce-workers-0.4.3 lib/fanforce/workers/worker.rb
fanforce-workers-0.4.2 lib/fanforce/workers/worker.rb
fanforce-workers-0.4.1 lib/fanforce/workers/worker.rb
fanforce-workers-0.4.0 lib/fanforce/workers/worker.rb
fanforce-workers-0.3.5 lib/fanforce/workers/worker.rb