require 'monitor' module StatsD::Instrument::Backends class UDPBackend < StatsD::Instrument::Backend DEFAULT_IMPLEMENTATION = :statsd include MonitorMixin attr_reader :host, :port attr_accessor :implementation, :default_metric_tags def initialize(server = nil, implementation = nil, default_metric_tags = {}) super() self.server = server || "localhost:8125" self.implementation = (implementation || DEFAULT_IMPLEMENTATION).to_sym self.default_metric_tags = default_metric_tags end def collect_metric(metric) unless implementation_supports_metric_type?(metric.type) StatsD.logger.warn("[StatsD] Metric type #{metric.type.inspect} not supported on #{implementation} implementation.") return false end if metric.sample_rate < 1.0 && rand > metric.sample_rate return false end write_packet(generate_packet(metric)) end def implementation_supports_metric_type?(type) case type when :h; implementation == :datadog when :kv; implementation == :statsite else true end end def server=(connection_string) self.host, port = connection_string.split(':', 2) self.port = port.to_i invalidate_socket end def host=(host) @host = host invalidate_socket end def port=(port) @port = port invalidate_socket end def socket if @socket.nil? @socket = UDPSocket.new @socket.connect(host, port) end @socket end def generate_packet(metric) command = "#{metric.name}:#{metric.value}|#{metric.type}" command << "|@#{metric.sample_rate}" if metric.sample_rate < 1 || (implementation == :statsite && metric.sample_rate > 1) metric_tags_map = self.default_metric_tags.dup if metric.tags metric.tags.each do |tag| key, value = tag.split(":") rescue nil metric_tags_map[key] = value end end metric_tags = [] metric_tags_map.each do |k ,v| if v.nil? metric_tags << "#{k}" else metric_tags << "#{k}:#{v}" end end if metric_tags.size > 0 if tags_supported? if implementation == :datadog command << "|##{metric_tags.join(',')}" elsif implementation == :collectd metric_tags = "[#{metric_tags.join(',')}]" command.prepend(metric_tags.gsub(":", "=")) end else StatsD.logger.warn("[StatsD] Tags are only supported on Datadog and CollectD implementation.") end end command << "\n" if implementation == :statsite command end def tags_supported? implementation == :datadog || implementation == :collectd end def write_packet(command) synchronize do socket.send(command, 0) > 0 end rescue ThreadError => e # In cases where a TERM or KILL signal has been sent, and we send stats as # part of a signal handler, locks cannot be acquired, so we do our best # to try and send the command without a lock. socket.send(command, 0) > 0 rescue SocketError, IOError, SystemCallError, Errno::ECONNREFUSED => e StatsD.logger.error "[StatsD] #{e.class.name}: #{e.message}" end def invalidate_socket @socket = nil end end end