Sha256: 208a14086d22069de6d2c8a0382eda3407034d2b7a1052a00e13b159b350c22b
Contents?: true
Size: 1.38 KB
Versions: 1
Compression:
Stored size: 1.38 KB
Contents
class Fanforce::Worker def self.run(worker_data, &code_block) self.new(worker_data, &code_block) end def initialize(worker_data, &code_block) require_dependencies run_validations @worker_data = worker_data @code_block = code_block run_worker iron_mq.queue(worker_data['worker_id']).get(timeout: 3600) end def require_dependencies require '.pluginenv' require 'fanforce/api' require 'iron_mq' end def run_validations end def iron_mq @iron_mq ||= IronMQ::Client.new end def run_worker(msg) puts '----------------------------------------------------------' print 'PROCESSING MESSAGE: ' task_data = Fanforce.decode_json(msg.body) set_env_vars(@worker_data['env_vars']) @code_block.call(task_data[:params].clone, task_data[:retries].clone) msg.delete rescue Exception => e if msg.nil? puts 'MESSAGE IS NIL' return end task_data[:last_error] = { exception: e.class.name, message: e.message, backtrace: e.backtrace, raised_at: Time.now } task_data[:env_vars] = @worker_data['env_vars'] print 'ADDING TO ERROR QUEUE: ' puts task_data.to_json Fanforce::Workers.iron.queue "#{@worker_data['worker_id']}-ERRORS", task_data msg.delete puts 'DELETED MESSAGE' end def set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fanforce-workers-0.3.4 | lib/fanforce/workers/worker.rb |