lib/flapjack/cli/receiver.rb in flapjack-1.6.0 vs lib/flapjack/cli/receiver.rb in flapjack-2.0.0b1
- old
+ new
@@ -1,14 +1,11 @@
#!/usr/bin/env ruby
-require 'dante'
require 'redis'
-require 'hiredis'
require 'flapjack/configuration'
require 'flapjack/data/event'
-require 'flapjack/data/migration'
require 'flapjack/patches'
# TODO options should be overridden by similar config file options
module Flapjack
@@ -28,105 +25,33 @@
@config.load(global_options[:config])
@config_env = @config.all
if @config_env.nil? || @config_env.empty?
unless 'mirror'.eql?(@options[:type])
- exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'"
+ exit_now! "No config data found in '#{global_options[:config]}'"
end
+ end
- @config_env = {}
- @config_runner = {}
- else
- @config_runner = @config_env["#{@options[:type]}-receiver"] || {}
+ unless 'mirror'.eql?(@options[:type])
+ Flapjack::RedisProxy.config = @config.for_redis
+ Zermelo.redis = Flapjack.redis
end
@redis_options = @config.for_redis
end
- def pidfile
- @pidfile ||= case
- when !@options[:pidfile].nil?
- @options[:pidfile]
- when !@config_env['pid_dir'].nil?
- File.join(@config_env['pid_dir'], "#{@options[:type]}-receiver.pid")
- else
- "/var/run/flapjack/#{@options[:type]}-receiver.pid"
- end
- end
-
- def logfile
- @logfile ||= case
- when !@options[:logfile].nil?
- @options[:logfile]
- when !@config_env['log_dir'].nil?
- File.join(@config_env['log_dir'], "#{@options[:type]}-receiver.log")
- else
- "/var/run/flapjack/#{@options[:type]}-receiver.log"
- end
- end
-
def start
- if runner(@options[:type]).daemon_running?
- puts "#{@options[:type]}-receiver is already running."
- else
- print "#{@options[:type]}-receiver starting..."
- main_umask = nil
- if @options[:daemonize]
- main_umask = File.umask
- else
- print "\n"
- end
- runner(@options[:type]).execute(:daemonize => @options[:daemonize]) do
- begin
- File.umask(main_umask) if @options[:daemonize]
- main(:fifo => @options[:fifo], :type => @options[:type])
- rescue Exception => e
- p e.message
- puts e.backtrace.join("\n")
- end
- end
- puts " done."
+ puts "#{@options[:type]}-receiver starting..."
+ begin
+ main(:fifo => @options[:fifo], :type => @options[:type])
+ rescue Exception => e
+ p e.message
+ puts e.backtrace.join("\n")
end
- end
-
- def stop
- pid = get_pid
- if runner(@options[:type]).daemon_running?
- print "#{@options[:type]}-receiver stopping..."
- runner(@options[:type]).execute(:kill => true)
- puts " done."
- else
- puts "#{@options[:type]}-receiver is not running."
- end
- exit_now! unless wait_pid_gone(pid)
- end
-
- def restart
- print "#{@options[:type]}-receiver restarting..."
- main_umask = File.umask
- runner(@options[:type]).execute(:daemonize => true, :restart => true) do
- begin
- File.umask(main_umask)
- main(:fifo => @options[:fifo], :type => @options[:type])
- rescue Exception => e
- p e.message
- puts e.backtrace.join("\n")
- end
- end
puts " done."
end
- def status
- if runner(@options[:type]).daemon_running?
- pid = get_pid
- uptime = Time.now - File.stat(pidfile).ctime
- puts "#{@options[:type]}-receiver is running: pid #{pid}, uptime #{uptime}"
- else
- exit_now! "#{@options[:type]}-receiver is not running"
- end
- end
-
def json
json_feeder(:from => @options[:from])
end
def mirror
@@ -144,25 +69,13 @@
end
private
def redis
- return @redis unless @redis.nil?
- @redis = Redis.new(@redis_options.merge(:driver => :hiredis))
- Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => @redis)
- Flapjack::Data::Migration.clear_orphaned_entity_ids(:redis => @redis)
- @redis
+ @redis ||= Redis.new(@redis_options)
end
- def runner(type)
- return @runner if @runner
-
- @runner = Dante::Runner.new("#{@options[:type]}-receiver", :pid_path => pidfile,
- :log_path => logfile)
- @runner
- end
-
def process_input(opts)
config_rec = case opts[:type]
when /nagios/
@config_env['nagios-receiver'] || {}
when /nsca/
@@ -245,11 +158,11 @@
'summary' => check_output,
'details' => details,
'perfdata' => check_perfdata,
'time' => timestamp,
}
- Flapjack::Data::Event.add(event, :redis => redis)
+ Flapjack::Data::Event.push('events', event)
end
rescue Redis::CannotConnectError
puts "Error, unable to to connect to the redis server (#{$!})"
end
end
@@ -261,103 +174,105 @@
puts "Whoops with the fifo, restarting main loop in 10 seconds"
sleep 10
end
end
- def process_exists(pid)
- return unless pid
- begin
- Process.kill(0, pid)
- return true
- rescue Errno::ESRCH
- return false
- end
- end
-
- # wait until the specified pid no longer exists, or until a timeout is reached
- def wait_pid_gone(pid, timeout = 30)
- print "waiting for a max of #{timeout} seconds for process #{pid} to exit" if process_exists(pid)
- started_at = Time.now.to_i
- while process_exists(pid)
- break unless (Time.now.to_i - started_at < timeout)
- print '.'
- sleep 1
- end
- puts ''
- !process_exists(pid)
- end
-
- def get_pid
- IO.read(pidfile).chomp.to_i
- rescue StandardError
- pid = nil
- end
-
- class EventFeedHandler < Oj::ScHandler
-
- def initialize(&block)
- @hash_depth = 0
- @callback = block if block_given?
- end
-
- def hash_start
- @hash_depth += 1
- Hash.new
- end
-
- def hash_end
- @hash_depth -= 1
- end
-
- def array_start
- Array.new
- end
-
- def array_end
- end
-
- def add_value(value)
- @callback.call(value) if @callback
- nil
- end
-
- def hash_set(hash, key, value)
- hash[key] = value
- end
-
- def array_append(array, value)
- array << value
- end
-
- end
-
def json_feeder(opts = {})
+ require 'json/stream'
+
input = if opts[:from]
File.open(opts[:from]) # Explodes if file does not exist.
- elsif $stdin.tty?
+ elsif !'java'.eql?(RUBY_PLATFORM) && STDIN.tty?
+ # tty check isn't working under JRuby, assume STDIN is OK to use
+ # https://github.com/jruby/jruby/issues/1332
exit_now! "No file provided, and STDIN is from terminal! Exiting..."
else
- $stdin
+ STDIN
end
# Sit and churn through the input stream until a valid JSON blob has been assembled.
# This handles both the case of a process sending a single JSON and then exiting
# (eg. cat foo.json | bin/flapjack receiver json) *and* a longer-running process spitting
# out events (eg. /usr/bin/slow-event-feed | bin/flapjack receiver json)
+ #
+ # @data is a stack, but @stack is used by the Parser class
+ parser = JSON::Stream::Parser.new do
+ start_document do
+ @data = []
+ @keys = []
+ @result = nil
+ end
- parser = EventFeedHandler.new do |parsed|
- # Handle "parsed" (a hash)
- errors = Flapjack::Data::Event.validation_errors_for_hash(parsed)
- if errors.empty?
- Flapjack::Data::Event.add(parsed, :redis => redis)
- puts "Enqueued event data, #{parsed.inspect}"
- else
- puts "Invalid event data received, #{errors.join(', ')} #{parsed.inspect}"
+ end_document {
+ # interfering with json-stream's "one object per stream" model,
+ # but it errors without this
+ @state = :start_document
+ }
+
+ start_object do
+ @data.push({})
end
+
+ end_object do
+ node = @data.pop
+
+ if @data.size > 0
+ top = @data.last
+ case top
+ when Hash
+ top[@keys.pop] = node
+ when Array
+ top << node
+ end
+ else
+ errors = Flapjack::Data::Event.validation_errors_for_hash(node)
+ if errors.empty?
+ Flapjack::Data::Event.push('events', node)
+ puts "Enqueued event data, #{node.inspect}"
+ else
+ puts "Invalid event data received, #{errors.join(', ')} #{node.inspect}"
+ end
+ end
+ end
+
+ start_array do
+ @data.push([])
+ end
+
+ end_array do
+ node = @data.pop
+ if @data.size > 0
+ top = @data.last
+ case top
+ when Hash
+ top[@keys.pop] = node
+ when Array
+ top << node
+ end
+ end
+ end
+
+ key do |k|
+ @keys << k
+ end
+
+ value do |v|
+ top = @data.last
+ case top
+ when Hash
+ top[@keys.pop] = v
+ when Array
+ top << v
+ else
+ @data << v
+ end
+ end
end
- Oj.sc_parse(parser, input)
+ while data = input.read(4096)
+ parser << data
+ end
puts "Done."
end
def mirror_receive(opts)
@@ -385,11 +300,12 @@
Redis.new(:url => dest_addr, :driver => :hiredis)
else
exit_now! "could not understand destination Redis config"
end
- Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => dest_redis)
+ Flapjack::RedisProxy.config = dest_redis
+ Zermelo.redis = Flapjack.redis
archives = mirror_get_archive_keys_stats(:source => source_redis)
raise "found no archives!" if archives.empty?
puts "found archives: #{archives.inspect}"
@@ -418,15 +334,21 @@
puts archive_key
loop do
event_json = source_redis.lindex(archive_key, cursor)
if event_json
- event = Flapjack::Data::Event.parse_and_validate(event_json)
- if !event.nil? && (include_re.nil? ||
+ event, errors = Flapjack::Data::Event.parse_and_validate(event_json)
+
+ if !errors.nil? && !errors.empty?
+ Flapjack.logger.error {
+ error_str = errors.nil? ? '' : errors.join(', ')
+ "Invalid event data received, #{error_str} #{event.inspect}"
+ }
+ elsif (include_re.nil? ||
(include_re === "#{event['entity']}:#{event['check']}"))
- Flapjack::Data::Event.add(event, :redis => dest_redis)
+ Flapjack::Data::Event.add(event)
events_sent += 1
print "#{events_sent} " if events_sent % 1000 == 0
end
cursor -= 1
next
@@ -496,177 +418,67 @@
# host_perfdata_file_mode=p
# service_perfdata_file_mode=p
# Details on the wiki: http://flapjack.io/docs/1.0/usage/USING#configuring-nagios
# '
+ nagios.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe'
- nagios.command :start do |start|
-
- start.switch [:d, 'daemonize'], :desc => 'Daemonize',
- :default_value => true
-
- start.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- start.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- start.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe'
-
- start.action do |global_options,options,args|
- options.merge!(:type => 'nagios')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.start
- end
+ nagios.action do |global_options,options,args|
+ options.merge!(:type => 'nagios')
+ receiver_cli = Flapjack::CLI::Receiver.new(global_options, options)
+ receiver_cli.start
end
-
- nagios.command :stop do |stop|
-
- stop.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- stop.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- stop.action do |global_options,options,args|
- options.merge!(:type => 'nagios')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.stop
- end
- end
-
- nagios.command :restart do |restart|
-
- restart.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- restart.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- restart.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe'
-
- restart.action do |global_options,options,args|
- options.merge!(:type => 'nagios')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.restart
- end
- end
-
- nagios.command :status do |status|
-
- status.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- status.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- status.action do |global_options,options,args|
- options.merge!(:type => 'nagios')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.status
- end
- end
-
end
receiver.desc 'NSCA receiver'
#receiver.arg_name 'Turn Nagios passive check results into Flapjack events'
receiver.command :nsca do |nsca|
- nsca.command :start do |start|
+ # # Not sure what to do with this, extra help output:
- # # Not sure what to do with this, extra help output:
+ # Required Nagios Configuration Changes
+ # -------------------------------------
- # Required Nagios Configuration Changes
- # -------------------------------------
+ # flapjack-nsca-receiver reads events from the nagios "command file" read from by Nagios, written to by the Nsca-daemon.
- # flapjack-nsca-receiver reads events from the nagios "command file" read from by Nagios, written to by the Nsca-daemon.
+ # The named pipe is automatically created by _nagios_ if it is enabled
+ # in the configfile:
- # The named pipe is automatically created by _nagios_ if it is enabled
- # in the configfile:
+ # # modified lines:
+ # command_file=/var/lib/nagios3/rw/nagios.cmd
- # # modified lines:
- # command_file=/var/lib/nagios3/rw/nagios.cmd
+ # The Nsca daemon is optionally writing to a tempfile if the named pipe does
+ # not exist.
- # The Nsca daemon is optionally writing to a tempfile if the named pipe does
- # not exist.
+ # Details on the wiki: http://flapjack.io/docs/1.0/usage/USING#XXX
+ # '
+ nsca.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe'
- # Details on the wiki: http://flapjack.io/docs/1.0/usage/USING#XXX
- # '
-
- start.switch [:d, 'daemonize'], :desc => 'Daemonize',
- :default_value => true
-
- start.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- start.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- start.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe'
-
- start.action do |global_options,options,args|
- options.merge!(:type => 'nsca')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.start
- end
+ nsca.action do |global_options,options,args|
+ options.merge!(:type => 'nsca')
+ cli_receiver = Flapjack::CLI::Receiver.new(global_options, options)
+ cli_receiver.start
end
- nsca.command :stop do |stop|
-
- stop.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- stop.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- stop.action do |global_options,options,args|
- options.merge!(:type => 'nsca')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.stop
- end
- end
-
- nsca.command :restart do |restart|
-
- restart.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- restart.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- restart.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe'
-
- restart.action do |global_options,options,args|
- options.merge!(:type => 'nsca')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.restart
- end
- end
-
- nsca.command :status do |status|
-
- status.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to'
-
- status.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to'
-
- status.action do |global_options,options,args|
- options.merge!(:type => 'nsca')
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.status
- end
- end
-
end
receiver.desc 'JSON receiver'
receiver.command :json do |json|
-
json.flag [:f, 'from'], :desc => 'PATH of the file to process [STDIN]'
json.action do |global_options,options,args|
- receiver = Flapjack::CLI::Receiver.new(global_options, options)
- receiver.json
+ cli_receiver = Flapjack::CLI::Receiver.new(global_options, options)
+ cli_receiver.json
end
end
receiver.desc 'Mirror receiver'
receiver.command :mirror do |mirror|
- mirror.flag [:s, 'source'], :desc => 'URL of source redis database, e.g. redis://localhost:6379/0',
+ mirror.flag [:s, 'source'], :desc => 'URL of source redis database, eg redis://localhost:6379/0',
:required => true
- mirror.flag [:d, 'dest'], :desc => 'URL of destination redis database, e.g. redis://localhost:6379/1'
-
- mirror.flag [:i, 'include'], :desc => 'Regexp which must match event id for it to be mirrored'
-
# one or both of follow, all is required
mirror.switch [:f, 'follow'], :desc => 'keep reading events as they are archived on the source',
:default_value => nil
mirror.switch [:a, 'all'], :desc => 'replay all archived events from the source',
@@ -679,11 +491,10 @@
# options.since in code
mirror.flag [:t, 'time'], :desc => 'replay all events archived on the source since TIME',
:default_value => nil
mirror.action do |global_options,options,args|
- options.merge!(:type => 'mirror')
receiver = Flapjack::CLI::Receiver.new(global_options, options)
receiver.mirror
end
end
@@ -694,10 +505,12 @@
oneoff.action do |global_options, options, args|
libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path
oneoff = libexec.join('oneoff')
if oneoff.exist?
Kernel.exec(oneoff.to_s, *ARGV)
+ else
+ exit_now! "Oneoff event submitter doesn't exist"
end
end
end
receiver.desc 'HTTP checker for availability checking of services'
@@ -706,10 +519,12 @@
httpchecker.action do |global_options, options, args|
libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path
httpchecker = libexec.join('httpchecker')
if httpchecker.exist?
Kernel.exec(httpchecker.to_s, *ARGV)
+ else
+ exit_now! "HTTP checker doesn't exist"
end
end
end
@@ -719,31 +534,40 @@
httpbroker.action do |global_options, options, args|
libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path
httpbroker = libexec.join('httpbroker')
if httpbroker.exist?
Kernel.exec(httpbroker.to_s, *ARGV)
+ else
+ exit_now! "HTTP broker doesn't exist"
end
end
end
+
+ receiver.desc 'HTTP API that caches and submits Cloudwatch events'
+ receiver.command :cloudwatchbroker do |cloudwatchbroker|
+ cloudwatchbroker.passthrough = true
+ cloudwatchbroker.action do |global_options, options, args|
+ libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path
+ cloudwatchbroker = libexec.join('httpbroker')
+ if cloudwatchbroker.exist?
+ Kernel.exec(cloudwatchbroker.to_s, *(ARGV + ['--format=sns']))
+ else
+ exit_now! "HTTP broker doesn't exist"
+ end
+ end
+ end
+
end
# # Nsca example line for a storage-device check:
# #[1393410685] PROCESS_SERVICE_CHECK_RESULT;db1.dev;STORAGE;0;Raid Set # 000 (800.0GB) is Normal.
# config_nr = config_env['nsca-receiver'] || {}
-# pidfile = options.pidfile.nil? ?
-# (config_nr['pid_file'] || "/var/run/flapjack/#{exe}.pid") :
-# options.pidfile
-
# logfile = options.logfile.nil? ?
# (config_nr['log_file'] || "/var/log/flapjack/#{exe}.log") :
# options.logfile
# fifo = options.fifo.nil? ?
# (config_nr['fifo'] || '/var/lib/nagios3/rw/nagios.cmd') :
# options.fifo
-
-# daemonize = options.daemonize.nil? ?
-# !!config_nr['daemonize'] :
-# options.daemonize