bin/riemann-rabbitmq in riemann-tools-dgvz-0.2.2.1 vs bin/riemann-rabbitmq in riemann-tools-dgvz-0.2.2.2

- old
+ new

@@ -1,9 +1,9 @@ #!/usr/bin/env ruby # -require File.expand_path('../../lib/riemann/tools', __FILE__) +require 'riemann/tools' class Riemann::Tools::Rabbitmq include Riemann::Tools require 'faraday' @@ -14,17 +14,34 @@ opt :read_timeout, 'Faraday read timeout', type: :int, default: 2 opt :open_timeout, 'Faraday open timeout', type: :int, default: 1 opt :monitor_user, 'RabbitMQ monitoring user', type: :string opt :monitor_pass, 'RabbitMQ monitoring user password', type: :string - opt :monitor_port, 'RabbitMQ monitoring port', default: 15672 - opt :monitor_host, 'RabbitMQ monitoring host', default: "localhost" + opt :monitor_port, 'RabbitMQ monitoring port', type: :int, default: 15672 + opt :monitor_host, 'RabbitMQ monitoring host', type: :string, default: "localhost" - def monitor_url - "http://#{options[:monitor_user]}:#{options[:monitor_pass]}@#{options[:monitor_host]}:#{options[:monitor_port]}/api/overview" + opt :max_queue_size, "max number of items in a queue that is acceptable", type: :int, default: 1_000_000 + opt :ignore_max_size_queues, "A regular expression to match queues that shouldn't be size-checked", type: :string + + opt :node, "Specify a node to monitor", type: :strings + + def base_url + "http://#{options[:monitor_user]}:#{options[:monitor_pass]}@#{options[:monitor_host]}:#{options[:monitor_port]}/api" end + def overview_url + "#{base_url}/overview" + end + + def node_url(n) + "#{base_url}/nodes/#{n}" + end + + def queues_url + "#{base_url}/queues" + end + def event_host if options[:event_host] return options[:event_host] else return options[:monitor_host] @@ -48,12 +65,90 @@ ) end response end - def tick - uri = URI(monitor_url) + def check_queues + response = safe_get(queues_url, event_host) + max_size_check_filter = if options[:ignore_max_size_queues] + Regexp.new(options[:ignore_max_size_queues]) + else + nil + end + + return if response.nil? + + json = JSON.parse(response.body) + + if response.status != 200 + report(:host => event_host, + :service => "rabbitmq.queue", + :state => "critical", + :description => "HTTP connection error to /api/queues: #{response.status} - #{response.body}" + ) + else + report(:host => event_host, + :service => "rabbitmq.queue", + :state => "ok", + :description => "HTTP connection ok" + ) + + json = JSON.parse(response.body) + + json.each do |queue| + svc = "rabbitmq.queue.#{queue['vhost']}.#{queue['name']}" + errs = [] + + if queue['messages_ready'] > 0 and queue['consumers'] == 0 + errs << "Queue has jobs but no consumers" + end + + if (max_size_check_filter.nil? or queue['name'] !~ max_size_check_filter) and queue['messages_ready'] > options[:max_queue_size] + errs << "Queue has #{queue['messages_ready']} jobs" + end + + unless errs.empty? + report(:host => event_host, + :service => svc, + :state => "critical", + :description => errs.join("; ") + ) + end + + stats = (queue['message_stats'] || {}).merge( + 'messages' => queue['messages'], + 'messages_details' => queue['messages_details'], + 'messages_ready' => queue['messages_ready'], + 'messages_ready_details' => queue['messages_ready_details'], + 'messages_unacknowledged' => queue['messages_unacknowledged'], + 'messages_unacknowledged_details' => queue['messages_unacknowledged_details'], + 'consumers' => queue['consumers'], + 'memory' => queue['memory'], + ) + + stats.each_pair do |k,v| + service = "#{svc}.#{k}" + if k =~ /details$/ + metric = v['rate'] + else + metric = v + end + + # TODO: Set state via thresholds which can be configured + + report(:host => event_host, + :service => service, + :metric => metric, + :description => "RabbitMQ monitor" + ) + end + end + end + end + + def check_overview + uri = URI(overview_url) response = safe_get(uri, event_host) return if response.nil? json = JSON.parse(response.body) @@ -92,8 +187,60 @@ :description => "RabbitMQ monitor" ) end end end + end + + def check_node + opts[:node].each do |n| + uri = URI(node_url(n)) + response = safe_get(uri, event_host) + + return if response.nil? + + if response.status != 200 + if response.status == 404 + report(:host => event_host, + :service => "rabbitmq.node.#{n}", + :state => "critical", + :description => "Node was not found in the cluster" + ) + else + report(:host => event_host, + :service => "rabbitmq.node.#{n}", + :state => "critical", + :description => "HTTP error: #{response.status} - #{response.body}" + ) + end + return + end + + json = JSON.parse(response.body) + + if json['mem_alarm'] + report(:host => event_host, + :service => "rabbitmq.node.#{n}", + :state => "critical", + :description => "Memory alarm has triggered; job submission throttled" + ) + return + end + + if json['disk_free_alarm'] + report(:host => event_host, + :service => "rabbitmq.node.#{n}", + :state => "critical", + :description => "Disk free alarm has triggered; job submission throttled" + ) + return + end + end + end + + def tick + check_overview + check_node if opts[:node] + check_queues end end Riemann::Tools::Rabbitmq.run