class Fanforce::Workers def initialize(opts={}) @opts = opts end def iron require_relative 'iron' @iron ||= Fanforce::Workers::Iron.new(@opts) end def enqueue(queue_id, payload) raise 'Payload being sent to the queue must be a Hash' if !payload.is_a?(Hash) iron.enqueue queue_id, params: payload, retries: 0 end def retry_error(queue_id, msg_id) msg = iron.client.queue(queue_id + '-ERRORS').peek(msg_id) data = MultiJson.load(msg.body, :symbolize_keys => true) iron.enqueue queue_id, params: data[:params], retries: data[:retries] + 1 msg.delete end ########################################################################################## def self.add_error(queue_id, payload) self.new.iron.enqueue("#{queue_id}-ERRORS", payload) end def self.enqueue(queue_id, payload) self.new.enqueue(queue_id, payload) end ########################################################################################## def self.run(worker_data, &code_block) require '.pluginenv' require 'iron_mq' require 'fanforce/api' run_validations run_worker IronMQ::Client.new.queue(worker_data['worker_id']).get(timeout: 3600), worker_data, &code_block end def self.run_validations end def self.run_worker(msg, worker_data, &code_block) puts '----------------------------------------------------------' print 'PROCESSING MESSAGE: ' task_data = Fanforce.decode_json(msg.body) set_env_vars(worker_data['env_vars']) code_block.call(task_data[:params].clone, task_data[:retries]) msg.delete rescue Exception => e if msg.nil? puts 'MESSAGE IS NIL' return end task_data[:last_error] = { exception: e.class.name, message: e.message, backtrace: e.backtrace, raised_at: Time.now } task_data[:last_error][:curl_command] = e.curl_command if e.respond_to?(:curl_command) task_data[:env_vars] = worker_data['env_vars'] puts "ADDING TO ERROR QUEUE: #{task_data.to_json}" self.add_error worker_data['worker_id'], task_data msg.delete puts 'DELETED MESSAGE' end def self.set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } load 'fanforce/api/ff_globals.rb' end end