Sha256: 48ffa2918978f1d54e763e72e394940d6598ad3d16bafe97de017b3d80571122
Contents?: true
Size: 1.37 KB
Versions: 6
Compression:
Stored size: 1.37 KB
Contents
class Fanforce::Worker def self.run(worker_data, &code_block) self.new(worker_data, &code_block) end def initialize(worker_data, &code_block) require_dependencies run_validations @worker_data = worker_data @code_block = code_block run_worker iron_mq.queue(worker_data['worker_id']).get(timeout: 3600) end def require_dependencies require '.pluginenv' require 'fanforce/api' require 'iron_mq' end def run_validations end def iron_mq @iron_mq ||= IronMQ::Client.new end def run_worker(msg) puts '----------------------------------------------------------' print 'PROCESSING MESSAGE: ' task_data = Fanforce.decode_json(msg.body) set_env_vars(@worker_data['env_vars']) @code_block.call(task_data[:params].clone, task_data[:retries]) msg.delete rescue Exception => e if msg.nil? puts 'MESSAGE IS NIL' return end task_data[:last_error] = { exception: e.class.name, message: e.message, backtrace: e.backtrace, raised_at: Time.now } task_data[:env_vars] = @worker_data['env_vars'] print 'ADDING TO ERROR QUEUE: ' puts task_data.to_json Fanforce::Workers.iron.queue "#{@worker_data['worker_id']}-ERRORS", task_data msg.delete puts 'DELETED MESSAGE' end def set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } end end
Version data entries
6 entries across 6 versions & 1 rubygems