require 'ruby-metrics' require 'new_relic/agent' require 'sqspoller/metrics/log_reporter' # collect the metrics around queue, message_type and message count. module SqsPoller module Metrics @new_relic_metrics_enabled = true @duration_units = :milliseconds @agent = nil UNKNOWN_MESSAGE_TYPE = "UNKNOWN" TASK_SUCCESS_STATUS = "success" TASK_FAILURE_STATUS = "failed" NEW_RELIC_CUSTOM_METRICS_PATH = "Custom/sqspoller/" # @param [Object] task def self.get_message_type(task) message_type = UNKNOWN_MESSAGE_TYPE begin parsed_message = JSON.parse(task[:message].body, symbolize_names: true) if parsed_message[:MessageType] != nil message_type = parsed_message[:MessageType] end rescue Exception => e # Ignoring When message Unknown. This happens when json is not parsable or message does not contains MessageType. end return message_type end def self.get_message_status(task_status) task_status ? TASK_SUCCESS_STATUS : TASK_FAILURE_STATUS end def self.get_queue_name(task) return task[:queue_name] end def self.get_metrics_recorder return @agent end def self.get_qps return @agent.timer(:total_messages) end def self.get_queue_wait_time(task, timer) queue_time = task[:queue_time] start_time = timer.start_time return queue_time - start_time end def self.record(task, task_status, timer, elapsed_time) return if @agent.nil? queue_name = get_queue_name(task) message_type = get_message_type(task) message_status = get_message_status(task_status) queue_wait_time = get_queue_wait_time(task, timer) local_metrics(elapsed_time, message_status, message_type, queue_name, queue_wait_time) new_relic_metrics(elapsed_time, message_status, message_type, queue_name, queue_wait_time) if @new_relic_metrics_enabled end def self.enable_new_relic_metrics @new_relic_metrics_enabled = true end def self.disable_new_relic_metrics @new_relic_metrics_enabled = false end private def self.local_metrics(elapsed_time, message_status, message_type, queue_name, queue_wait_time) @agent.timer(:total_messages, @duration_units).update(elapsed_time, @duration_units) @agent.timer(queue_name.to_sym, @duration_units).update(elapsed_time, @duration_units) @agent.timer("#{queue_name}_#{message_type}".to_sym, @duration_units).update(elapsed_time, @duration_units) @agent.timer("#{queue_name}_#{message_type}_wait_time".to_sym, @duration_units).update(queue_wait_time, @duration_units) @agent.counter("#{message_status}".to_sym).inc @agent.counter("#{queue_name}_count".to_sym).inc @agent.counter("#{queue_name}_#{message_status}".to_sym).inc @agent.counter("#{queue_name}_#{message_type}_#{message_status}".to_sym).inc end def self.new_relic_metrics(elapsed_time, message_status, message_type, queue_name, queue_wait_time) ::NewRelic::Agent.record_metric("#{NEW_RELIC_CUSTOM_METRICS_PATH}all", elapsed_time) ::NewRelic::Agent.record_metric("#{NEW_RELIC_CUSTOM_METRICS_PATH}#{queue_name}", elapsed_time) ::NewRelic::Agent.record_metric("#{NEW_RELIC_CUSTOM_METRICS_PATH}#{queue_name}_#{message_type}", elapsed_time) ::NewRelic::Agent.record_metric("#{NEW_RELIC_CUSTOM_METRICS_PATH}#{queue_name}_#{message_type}_wait_time", queue_wait_time) ::NewRelic::Agent.increment_metric("#{NEW_RELIC_CUSTOM_METRICS_PATH}count") ::NewRelic::Agent.increment_metric("#{queue_name}_count") ::NewRelic::Agent.increment_metric("#{queue_name}_#{message_status}_count") ::NewRelic::Agent.increment_metric("#{queue_name}_#{message_type}_#{message_status}_count") end def self.start_metrics_agent @agent = Metrics::Agent.new @agent.report_to("log_reporter", SqsPoller::Metrics::LogReporter.new) @agent.report_periodically end end end