class Fanforce::Workers def initialize(opts={}) @opts = opts end def iron_mq require 'iron_mq' @iron_mq ||= IronMQ::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end def iron_cache require 'iron_cache' @iron_cache ||= IronCache::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end def enqueue(queue_id, params, options={}) retries = (options[:retries].present?) ? options.delete(:retries) : 0 raise 'Params being sent to the queue must be a Hash' if !params.is_a?(Hash) iron_mq.queue(queue_id).post({params: params, retries: retries}.to_json, options) end def add_error(queue_id, error) require 'uuidtools' details_id = UUIDTools::UUID.random_create.to_s iron_cache.cache("#{queue_id}-ERRORS").put(details_id, error.to_json) iron_mq.queue("#{queue_id}-ERRORS").post({ details_id: details_id, exception: truncate(error[:exception]), message: truncate(error[:message].to_s), params: truncate(error[:params].to_json), errored_at: error[:errored_at], retries: error[:retries], env_vars: truncate(error[:env_vars].to_json), curl_command: truncate(error[:curl_command].to_s) }.to_json) rescue => e puts '-----------------------------------------------------' puts 'WORKER ERROR WHILE RECOVERING FROM JOB ERROR:' puts e.message puts e.backtrace puts '-----------------------------------------------------' puts 'JOB ERROR:' puts "details_id: #{details_id}" puts "exception: #{truncate(error[:exception])}" puts "message: #{truncate(error[:message].to_s)}" puts "params: #{truncate(error[:params].to_json)}" puts "errored_at: #{error[:errored_at]}" puts "retries: #{error[:retries]}" puts "env_vars: #{truncate(error[:env_vars].to_json)}" puts "curl_command: #{truncate(error[:curl_command].to_s)}" end def delete_error(queue_id, job_id, details_id) iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) iron_cache.cache("#{queue_id}-ERRORS").delete(details_id) end def error_details(queue_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) MultiJson.load(cache.value, :symbolize_keys => true) end def retry_error(queue_id, job_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) cache_data = MultiJson.load(cache.value, :symbolize_keys => true) enqueue(queue_id, cache_data[:params], :retries => cache_data[:retries] + 1) cache.delete and iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) end def truncate(text, length=130, truncate_string="...") if text l = length - truncate_string.chars.to_a.length chars = text.chars.to_a (chars.length > length ? chars[0...l].join('') + truncate_string : text).to_s end end ########################################################################################## def self.enqueue(queue_id, params, options={}) self.new.enqueue(queue_id, params, options) end def self.add_error(queue_id, error) self.new.add_error(queue_id, error) end ########################################################################################## def self.current_queue_id=(queue_id) @current_queue_id = queue_id end def self.current_queue_id @current_queue_id end def self.current_worker_env=(env_vars) @current_worker_env = env_vars end def self.current_worker_env @current_worker_env end def self.current_params=(params) @current_params = params end def self.current_params @current_params end def self.current_retries=(retries) @current_retries = retries end def self.current_retries @current_retries end def self.current_job=(job) @current_job = job end def self.current_job @current_job end def self.run(worker_data, &code_block) require '.pluginenv' require 'iron_mq' require 'iron_cache' require 'fanforce/api' require 'active_support/all' self.current_queue_id = worker_data['queue_id'] self.current_worker_env = worker_data['env_vars'] queue = IronMQ::Client.new.queue(current_queue_id) job_num = 0 puts 'PROCESSING...' while (job = queue.get(timeout: 3600)) do puts "JOB #{job_num+=1}: #{job.body}" run_job job, &code_block self.delete_job end self.delete_job puts 'DONE' end def self.retry(options) self.new.enqueue(current_queue_id, current_params, options.merge(retries: current_retries + 1)) end def self.run_job(job, &code_block) puts '----------------------------------------------------------' print 'PROCESSING MESSAGE: ' task_data = Fanforce.decode_json(job.body) self.current_job = job self.current_params = task_data[:params] self.current_retries = task_data[:retries] set_env_vars(current_worker_env) code_block.call(task_data[:params].clone, retries: task_data[:retries], queue_id: current_queue_id) self.delete_job(job) rescue Exception => e if job.nil? puts 'MESSAGE IS NIL' return end error = task_data.merge( exception: e.class.name, message: e.message, backtrace: e.backtrace, errored_at: Time.now, env_vars: current_worker_env ) error[:curl_command] = e.curl_command if e.respond_to?(:curl_command) puts "ADDING TO ERROR CACHE: #{error.to_json}" self.delete_job(job) puts 'DELETED MESSAGE' self.add_error current_queue_id, error end def self.delete_job(job=nil) return if job.nil? and current_job.nil? (job || current_job).delete self.current_job = nil end def self.set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } load 'fanforce/api/ff_globals.rb' end end