module Metricsd
# Client class implements MetricsD protocol, allowing to send various metrics
# to the MetricsD server for collecting, analyzing, and graphing them.
#
# Class allows to record count and timing stats to the database:
#
# # Record success hit
# Metricsd::Client.record_success("api.docs.upload")
# # Record failure hit
# Metricsd::Client.record_failure("api.docs.upload")
# # Record timing info
# Metricsd::Client.record_time("api.docs.upload", 0.14)
# # Record complete success hit info (count + timing)
# Metricsd::Client.record_hit("api.docs.upload", true, 0.14)
# # Record an integer value
# Metricsd::Client.record_value("user.password.size", 15)
# Metricsd::Client.record_value("user.age", 26)
#
# To send several metrics in a single network packet, you can use record_values:
#
# # Send all database pool stats
# Metricsd::Client.record_values({
# 'db.pool.reserved' => db_stats[:reserved],
# 'db.pool.available' => db_stats[:available],
# 'db.pool.pending' => db_stats[:pending],
# }, :group => 'doc_timestamp')
#
# You can specify message source using :source => 'src' option. In this case
# you will be able to see summary graphs and graphs per source:
#
# # Generate graphs for all tables, and each single table.
# Metricsd::Client.record_success("hbase.reads", :source => @hbase_table)
#
# By default only summary statistics is calculated. You can enable per-host graphs
# by specifying the appropriate source:
#
# # Generate summary graph for all hosts, and graphs for each single host.
# Metricsd::Client.record_success("hbase.reads", :source => Metricsd::Client.source)
# # ... or you can pass an empty string with the same effect.
# Metricsd::Client.record_success("hbase.reads", :source => '')
#
# You can group your metrics using :group option. In this case metrics will be
# displayed together on the summary page.
#
# # Group metrics using :group option.
# Metricsd::Client.record_success("reads", :source => @hbase_table, :group => 'hbase')
# # Group metrics using special syntax "group.metric".
# Metricsd::Client.record_success("hbase.reads", :source => @hbase_table)
#
class Client
class << self
# Record complete hit info. Time should be a floating point
# number of seconds.
#
# It creates two metrics:
# * +your.metric.count+ with counts of failed and succeded events
# * +your.metric.time+ with time statistics
#
# @param [String] metric is the metric name (like app.docs.upload)
# @param [Boolean] is_success indicating whether request was successful.
# @param [Float] time floating point number of seconds.
# @param [Hash] opts options.
# @option opts [String] :group metrics group.
# @option opts [String] :source metric source.
#
def record_hit(metric, is_success, time, opts = {})
record_internal({
"#{metric}.status" => is_success ? 1 : -1,
"#{metric}.time" => (time * 1000).round
}, opts
)
end
alias :hit :record_hit
# Record succeded boolean event.
#
# It creates a single metric:
# * +your.metric.count+ with numbers of failed and succeded events
#
# @param [String] metric is the metric name (like app.docs.upload)
# @param [Hash] opts options.
# @option opts [String] :group metrics group.
# @option opts [String] :source metric source.
#
def record_success(metric, opts = {})
record_internal({"#{metric}.status" => 1}, opts)
end
alias :success :record_success
# Record failed boolean event.
#
# It creates a single metric:
# * +your.metric.count+ with numbers of failed and succeded events
#
# @param [String] metric is the metric name (like app.docs.upload)
# @param [Hash] opts options.
# @option opts [String] :group metrics group.
# @option opts [String] :source metric source.
#
def record_failure(metric, opts = {})
record_internal({"#{metric}.status" => -1}, opts)
end
alias :failure :record_failure
# Record timing info. Time should be a floating point
# number of seconds.
#
# It creates a single metric:
# * +your.metric.time+ with time statistics
#
# @param [String] metric is the metric name (like app.docs.upload)
# @param [Float] time floating point number of seconds.
# @param [Hash] opts options.
# @option opts [String] :group metrics group.
# @option opts [String] :source metric source.
#
def record_time(metric, time = nil, opts = {}, &block)
opts, time = time, nil if Hash === time
result = nil
if time.nil?
raise ArgumentError, "You should pass a block if time is not given" unless block_given?
time = Benchmark.measure do
result = block.call
end.real
end
record_internal({"#{metric}.time" => (time * 1000).round}, opts)
result
end
alias :time :record_time
# Record an integer value.
#
# It creates a single metric:
# * +your.metric+ with values statistics
#
# @param [String] metric is the metric name (like app.docs.upload)
# @param [Integer] value metric value.
# @param [Hash] opts options.
# @option opts [String] :group metrics group.
# @option opts [String] :source metric source.
#
def record_value(metric, value, opts = {})
record_internal({metric => value.round}, opts)
end
alias :record :record_value
alias :value :record_value
# Record multiple integer values.
#
# It creates a metric for each entry in +metrics+ Hash:
# * +your.metric+ with values statistics
#
# @param [Hash] metrics a +Hash+ that maps metrics names to their values.
# @param [Hash] opts options.
# @option opts [String] :group metrics group.
# @option opts [String] :source metric source.
#
# @example
# Metricsd::Client.record_values(
# 'db.pool.reserved' => db_stats[:reserved],
# 'db.pool.available' => db_stats[:available],
# 'db.pool.pending' => db_stats[:pending],
# )
#
def record_values(metrics, opts = {})
record_internal(metrics, opts)
end
alias :values :record_values
# Reset and re-establish connection.
def reset_connection!
@@socket = nil
end
private
# Returns a UDP socket used to send metrics to MetricsD.
def collector_socket
@@lock.synchronize do
@@socket ||= UDPSocket.new.tap do |sock|
sock.connect(Metricsd.server_host, Metricsd.server_port)
end
end
rescue SocketError => e
Metricsd.logger.error("Exception occurred while trying to connect to MetricsD (#{Metricsd.server_host}:#{Metricsd.server_port}): #{e.inspect}")
nil
end
# Send informating to the RRD collector daemon using UDP protocol.
def record_internal(metrics, opts = {})
return unless Metricsd.enabled?
opts = { :source => Metricsd.default_source }.update(opts)
opts[:source] = Metricsd.source if opts[:source].empty?
# Build the message for
send_in_packets Array(metrics).map { |arg| pack(arg[0], arg[1], opts) }.sort
end
# Combines string representations of metrics into packets of 250 bytes and
# sends them to MetricsD.
def send_in_packets(strings)
msg = ''
strings.each do |s|
if s.size > 250
Metricsd.logger.warn("Message is larger than 250 bytes, so it was ignored: #{s}")
next
end
if msg.size + s.size + (msg.size > 0 ? 1 : 0) > 250
safe_send(msg)
msg = ''
end
msg << (msg.size > 0 ? ';' : '') << s
end
safe_send(msg) if msg.size > 0
end
# Sends a string to the MetricsD. Should never raise any network-specific
# exceptions, but log them instead, and silently return.
def safe_send(msg)
collector_socket.send(msg, 0) if collector_socket
true
rescue Errno::ECONNREFUSED => e
Metricsd.logger.error("Exception occurred while trying to send data to MetricsD (#{Metricsd.server_host}:#{Metricsd.server_port}): #{e.inspect}")
false
end
# Packs metric into a string representation according to the MetricsD
# protocol.
def pack(key, value, opts)
group = opts[:group] || Metricsd.default_group || ''
key = "#{group}.#{key}" unless group.empty?
opts[:source].empty? ? "#{key}:#{value}" : "#{opts[:source]}@#{key}:#{value}"
end
end
@@lock = Monitor.new
end
end