lib/logstash/outputs/graphite.rb in logstash-output-graphite-1.0.0 vs lib/logstash/outputs/graphite.rb in logstash-output-graphite-1.0.1

- old
+ new

@@ -65,10 +65,22 @@ # metrics_format => "foo.bar.*.sum" # # NOTE: If no metrics_format is defined, the name of the metric will be used as fallback. config :metrics_format, :validate => :string, :default => DEFAULT_METRICS_FORMAT + # When hashes are passed in as values they are broken out into a dotted notation + # For instance if you configure this plugin with + # # [source,ruby] + # metrics => "mymetrics" + # + # and "mymetrics" is a nested hash of '{a => 1, b => { c => 2 }}' + # this plugin will generate two metrics: a => 1, and b.c => 2 . + # If you've specified a 'metrics_format' it will respect that, + # but you still may want control over the separator within these nested key names. + # This config setting changes the separator from the '.' default. + config :nested_object_separator, :validate => :string, :default => "." + def register @include_metrics.collect!{|regexp| Regexp.new(regexp)} @exclude_metrics.collect!{|regexp| Regexp.new(regexp)} if @metrics_format && !@metrics_format.include?(METRIC_PLACEHOLDER) @@ -104,31 +116,14 @@ def receive(event) return unless output?(event) # Graphite message format: metric value timestamp\n - messages = [] - timestamp = event[@timestamp_field].to_i + messages = @fields_are_metrics ? + messages_from_event_fields(event, @include_metrics, @exclude_metrics) : + messages_from_event_metrics(event, @metrics) - if @fields_are_metrics - @logger.debug("got metrics event", :metrics => event.to_hash) - event.to_hash.each do |metric,value| - next if EXCLUDE_ALWAYS.include?(metric) - next unless @include_metrics.empty? || @include_metrics.any? { |regexp| metric.match(regexp) } - next if @exclude_metrics.any? {|regexp| metric.match(regexp)} - messages << "#{construct_metric_name(metric)} #{event.sprintf(value.to_s).to_f} #{timestamp}" - end - else - @metrics.each do |metric, value| - @logger.debug("processing", :metric => metric, :value => value) - metric = event.sprintf(metric) - next unless @include_metrics.any? {|regexp| metric.match(regexp)} - next if @exclude_metrics.any? {|regexp| metric.match(regexp)} - messages << "#{construct_metric_name(event.sprintf(metric))} #{event.sprintf(value).to_f} #{timestamp}" - end - end - if messages.empty? @logger.debug("Message is empty, not sending anything to Graphite", :messages => messages, :host => @host, :port => @port) else message = messages.join("\n") @logger.debug("Sending carbon messages", :messages => messages, :host => @host, :port => @port) @@ -143,8 +138,71 @@ sleep(@reconnect_interval) connect retry if @resend_on_failure end end - end # def receive + + private + + def messages_from_event_fields(event, include_metrics, exclude_metrics) + timestamp = event_timestamp(event) + @logger.debug? && @logger.debug("got metrics event", :metrics => event.to_hash) + event.to_hash.flat_map do |metric,value| + next if EXCLUDE_ALWAYS.include?(metric) + next unless include_metrics.empty? || include_metrics.any? { |regexp| metric.match(regexp) } + next if exclude_metrics.any? {|regexp| metric.match(regexp)} + + metrics_lines_for_event(event, metric, value, timestamp) + end + end + + def messages_from_event_metrics(event, metrics) + timestamp = event_timestamp(event) + metrics.flat_map do |metric, value| + @logger.debug("processing", :metric => metric, :value => value) + metric = event.sprintf(metric) + next unless @include_metrics.any? {|regexp| metric.match(regexp)} + next if @exclude_metrics.any? {|regexp| metric.match(regexp)} + + metrics_lines_for_event(event, metric, value, timestamp) + end + end + + def event_timestamp(event) + event[@timestamp_field].to_i + end + + def metrics_lines_for_event(event, metric, value, timestamp) + if event[metric].is_a?(Hash) + dotify(event[metric], metric).map do |k,v| + metrics_line(k, v, timestamp) + end + else + metrics_line(event.sprintf(metric), event.sprintf(value).to_f, timestamp) + end + end + + def metrics_line(name, value, timestamp) + "#{construct_metric_name(name)} #{value} #{timestamp}" + end + + # Take a nested ruby hash of the form {:a => {:b => 2}, c: => 3} and + # turn it into a hash of the form + # { "a.b" => 2, "c" => 3} + def dotify(hash,prefix=nil) + hash.reduce({}) do |acc,kv| + k,v = kv + pk = prefix ? "#{prefix}#{@nested_object_separator}#{k}" : k.to_s + if v.is_a?(Hash) + acc.merge!(dotify(v, pk)) + elsif v.is_a?(Array) + # There's no right answer here, so we do nothing + @logger.warn("Array values not supported for graphite metrics! Ignoring #{hash} @ #{prefix}") + else + acc[pk] = v + end + acc + end + end + end # class LogStash::Outputs::Graphite