require 'redis' require 'uuidtools' class Fanforce::Worker::Errors include Fanforce::Worker::Utils @@redis = nil def self.redis @@redis ||= Redis.new(url: Fanforce::Worker.redis_url) end def self.find(queue_id, error_id) Fanforce::Worker::Error.new(queue_id, error_id) end def self.list(queue_id, from_num=0, to_num=100, reverse=false) error_ids = redis.lrange("ERRORS:#{queue_id}", from_num, to_num) error_ids.reverse! if reverse Fanforce::Worker::ErrorList.new(queue_id, error_ids) end def self.list_summaries(queue_id, from_num=0, to_num=100, reverse=false) error_ids = redis.lrange("ERRORS:#{queue_id}", from_num, to_num) error_ids.reverse! if reverse Fanforce::Worker::ErrorList.new(queue_id, error_ids).summaries end def self.delete_all(queue_id, error_ids) Fanforce::Worker::ErrorList.new(queue_id, error_ids).delete end def self.retry_all(queue_id, error_ids) Fanforce::Worker::ErrorList.new(queue_id, error_ids).retry end def self.add(queue_id, e, job_data, worker_env) error_id = UUIDTools::UUID.random_create.to_s error = { error_id: error_id, class: Fanforce::Worker, exception: e.class.name, http_code: (e.code if e.respond_to?(:code)), message: e.message, backtrace: e.backtrace.to_json, errored_at: Time.now, queue_id: queue_id, raw_json: job_data[:params].to_json, env_vars: worker_env.to_json, retries: job_data[:retries], curl_command: (e.curl_command if e.respond_to?(:curl_command)), iron: {token: Fanforce::Worker.iron_token, project_id: Fanforce::Worker.iron_project_id} } redis.rpush("ERRORS:#{queue_id}", error_id) redis.hmset("ERRORS:#{queue_id}:#{error_id}", error.flatten) rescue => e log.fatal '-----------------------------------------------------' log.fatal 'WORKER ERROR WHILE RECOVERING FROM JOB ERROR:' log.fatal e.message log.fatal e.backtrace log.fatal '-----------------------------------------------------' log.fatal 'JOB ERROR:' error.each {|k,v| log.fatal "#{k}: #{v}" } end end ################################################################################################################### class Fanforce::Worker::Error def initialize(queue_id, error_id) @queue_id = queue_id @error_id = error_id end def delete self.class.delete(@queue_id, @error_id) return nil end def details v = self.class.get_details(@queue_id, @error_id) self.class.format_details(v) end def summary v = self.class.get_summary(@queue_id, @error_id) self.class.format_summary(v) end def retry v = Error.get_all(@queue_id, @error_id) Error.retry(@queue_id, v) end ################################################################################################################### SUMMARY_FIELDS = [:error_id, :exception, :message, :errored_at, :raw_json, :retries] DETAIL_FIELDS = [:error_id, :exception, :http_code, :message, :backtrace, :errored_at, :raw_json, :env_vars, :retries, :curl_command] def self.redis; Fanforce::Worker::Errors.redis end def self.delete(queue_id, error_id) redis.srem("ERRORS:#{queue_id}", error_id) redis.del("ERRORS:#{queue_id}:#{error_id}") end def self.get_summary(queue_id, error_id) redis.hmget("ERRORS:#{queue_id}:#{error_id}", *SUMMARY_FIELDS) end def self.format_summary(summary) format(Hash[SUMMARY_FIELDS.zip(summary)]) end def self.format(error) error.each do |k,v| error[k] = case k when :backtrace then MultiJson.load(v) rescue [] when :env_vars then MultiJson.load(v) rescue {} when :retries then v.to_i else v end end end def self.get_details(queue_id, error_id) redis.hmget("ERRORS:#{queue_id}:#{error_id}", *DETAIL_FIELDS) end def self.format_details(details) format(Hash[DETAIL_FIELDS.zip(details)]) end def self.get_all(queue_id, error_id) redis.hgetall("ERRORS:#{queue_id}:#{error_id}") end def self.retry(queue_id, raw_error) error = format(raw_error.symbolize_keys) params = MultiJson.load(v, symbolize_keys: true) rescue {} Fanforce::Worker.set_config(iron_token: params[:iron][:token], iron_project_id: params[:iron][:project_id]) Fanforce::Worker.enqueue(queue_id, params, :retries => error[:retries] + 1) delete(:queue_id, error[:_id]) end end ################################################################################################################### class Fanforce::Worker::ErrorList Error = Fanforce::Worker::Error def initialize(queue_id, error_ids) @queue_id = queue_id @error_ids = error_ids end def redis; Fanforce::Worker::Errors.redis end def summaries redis_responses = [] redis.multi do @error_ids.each do |error_id| redis_responses << Error.get_summary(@queue_id, error_id) end end redis_responses.map do |redis_response| Error.format_summary(redis_response.value) end end def retry redis_responses = [] redis.multi do @error_ids.each do |error_id| redis_responses << Error.get_all(@queue_id, error_id) end end redis_responses.map do |redis_response| Error.retry(@queue_id, redis_response.value) end end def delete redis.multi do @error_ids.each do |error_id| Error.delete(@queue_id, error_id) end end return nil end end