require 'timeout' class Fanforce::Worker class Timeout < RuntimeError; end LOADED_AT = Time.now MAX_EXECUTION_TIME = 3300 def initialize(opts={}) @opts = opts end def iron_mq require 'iron_mq' @iron_mq ||= IronMQ::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end def iron_cache require 'iron_cache' @iron_cache ||= IronCache::Client.new(:token => @opts[:token] || ENV['IRON_TOKEN'], :project_id => @opts[:project_id] || ENV['IRON_PROJECT_ID']) end def enqueue(queue_id, params, options={}) retries = (options[:retries].present?) ? options.delete(:retries) : 0 raise 'Params being sent to the queue must be a Hash' if !params.is_a?(Hash) iron_mq.queue(queue_id).post({params: params, retries: retries}.to_json, options) end def add_error(queue_id, error) require 'uuidtools' details_id = UUIDTools::UUID.random_create.to_s iron_cache.cache("#{queue_id}-ERRORS").put(details_id, error.to_json) iron_mq.queue("#{queue_id}-ERRORS").post({ details_id: details_id, http_code: error[:http_code], exception: truncate(error[:exception]), message: truncate(error[:message].to_s), params: truncate(error[:params].to_json), errored_at: error[:errored_at], retries: error[:retries], env_vars: truncate(error[:env_vars].to_json), curl_command: truncate(error[:curl_command].to_s) }.to_json) rescue => e puts '-----------------------------------------------------' puts 'WORKER ERROR WHILE RECOVERING FROM JOB ERROR:' puts e.message puts e.backtrace puts '-----------------------------------------------------' puts 'JOB ERROR:' puts "details_id: #{details_id}" puts "http_code: #{error[:http_code]}" puts "exception: #{truncate(error[:exception])}" puts "message: #{truncate(error[:message].to_s)}" puts "params: #{truncate(error[:params].to_json)}" puts "errored_at: #{error[:errored_at]}" puts "retries: #{error[:retries]}" puts "env_vars: #{truncate(error[:env_vars].to_json)}" puts "curl_command: #{truncate(error[:curl_command].to_s)}" end def delete_error(queue_id, job_id, details_id) iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) iron_cache.cache("#{queue_id}-ERRORS").delete(details_id) end def error_details(queue_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) MultiJson.load(cache.value, :symbolize_keys => true) end def retry_error(queue_id, job_id, details_id) cache = iron_cache.cache("#{queue_id}-ERRORS").get(details_id) cache_data = MultiJson.load(cache.value, :symbolize_keys => true) enqueue(queue_id, cache_data[:params], :retries => cache_data[:retries] + 1) cache.delete and iron_mq.queue("#{queue_id}-ERRORS").delete(job_id) end def truncate(text, length=130, truncate_string="...") if text l = length - truncate_string.chars.to_a.length chars = text.chars.to_a (chars.length > length ? chars[0...l].join('') + truncate_string : text).to_s end end ########################################################################################## def self.enqueue(queue_id, params, options={}) self.new.enqueue(queue_id, params, options) end def self.add_error(queue_id, error) self.new.add_error(queue_id, error) end ########################################################################################## def self.current_queue_id=(queue_id) @current_queue_id = queue_id end def self.current_queue_id @current_queue_id end def self.current_worker_env=(env_vars) @current_worker_env = env_vars end def self.current_worker_env @current_worker_env end def self.current_params=(params) @current_params = params end def self.current_params @current_params end def self.current_retries=(retries) @current_retries = retries end def self.current_retries @current_retries end def self.current_job=(job) @current_job = job end def self.current_job @current_job end def self.load_env if File.exists?('.developmentenv.rb') require '.developmentenv' elsif File.exists?('.stagingenv.rb') require '.stagingenv' elsif File.exists?('.productionenv.rb') require '.productionenv' end end def self.run(worker_data, min_execution_time=300, &code_block) raise "min_execution_time was set to #{min_execution_time}, which is #{min_execution_time - MAX_EXECUTION_TIME} seconds too long" if min_execution_time > MAX_EXECUTION_TIME puts 'LOADING WORKER ENV...' load_env require 'iron_mq' require 'iron_cache' require 'fanforce/api' require 'active_support/all' self.current_queue_id = worker_data['queue_id'] self.current_worker_env = worker_data['env_vars'] queue = IronMQ::Client.new.queue(current_queue_id) puts 'PROCESSING JOBS...' puts '------------------------------------------------------------------------------------' job_num = 0 job_data = nil while job_has_enough_time_to_run(min_execution_time) and (job = queue.get(timeout: 3600)) do puts "- JOB #{job_num+=1}: #{job.body}" Timeout::timeout(worker_time_remaining, Fanforce::Worker::Timeout) do job_data = nil job_data = Fanforce.decode_json(job.body) run_job(job, job_data, &code_block) end self.delete_job puts '------------------------------------------------------------------------------------' end self.delete_job puts 'WINDING DOWN WORKER!' rescue Exception => e handle_job_loading_error(e, job, job_data) end def self.worker_time_remaining time_since_load = Time.now - LOADED_AT MAX_EXECUTION_TIME - time_since_load end def self.job_has_enough_time_to_run(min_execution_time) time_since_load = Time.now - LOADED_AT return false if time_since_load > MAX_EXECUTION_TIME return false if worker_time_remaining < min_execution_time return true end def self.retry(options) self.new.enqueue(current_queue_id, current_params, options.merge(retries: current_retries + 1)) end def self.run_job(job, job_data, &code_block) trap('SIGHUP') do raise 'trapped SIGHUP' end self.current_job = job self.current_params = job_data[:params] self.current_retries = job_data[:retries] set_env_vars(current_worker_env) code_block.call(job_data[:params].clone, retries: job_data[:retries], queue_id: current_queue_id) self.delete_job(job) rescue Exception => e handle_job_error(e, job, job_data) end def self.handle_job_loading_error(e, job, job_data) raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil? self.delete_job(job) puts 'REMOVED JOB FROM QUEUE, BUT COULD NOT SAVE TO ERROR CACHE...' raise($!, "#{$!}: #{job_data.to_json}", $!.backtrace) end def self.handle_job_error(e, job, job_data) raise($!, "#{$!}: THERE IS NO JOB", $!.backtrace) if job.nil? error = job_data.merge( http_code: (e.code if e.respond_to?(:code)), exception: e.class.name, message: e.message, backtrace: e.backtrace, errored_at: Time.now, env_vars: current_worker_env ) error[:curl_command] = e.curl_command if e.respond_to?(:curl_command) self.delete_job(job) puts 'REMOVED JOB FROM QUEUE, AND SAVING TO ERROR CACHE...' puts error.to_json self.add_error current_queue_id, error end def self.delete_job(job=nil) return if job.nil? and current_job.nil? (job || current_job).delete self.current_job = nil end def self.set_env_vars(vars) vars.each {|k,v| ENV[k.to_s]=v } end end