lib/flapjack/cli/receiver.rb in flapjack-1.6.0 vs lib/flapjack/cli/receiver.rb in flapjack-2.0.0b1

- old
+ new

@@ -1,14 +1,11 @@ #!/usr/bin/env ruby -require 'dante' require 'redis' -require 'hiredis' require 'flapjack/configuration' require 'flapjack/data/event' -require 'flapjack/data/migration' require 'flapjack/patches' # TODO options should be overridden by similar config file options module Flapjack @@ -28,105 +25,33 @@ @config.load(global_options[:config]) @config_env = @config.all if @config_env.nil? || @config_env.empty? unless 'mirror'.eql?(@options[:type]) - exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'" + exit_now! "No config data found in '#{global_options[:config]}'" end + end - @config_env = {} - @config_runner = {} - else - @config_runner = @config_env["#{@options[:type]}-receiver"] || {} + unless 'mirror'.eql?(@options[:type]) + Flapjack::RedisProxy.config = @config.for_redis + Zermelo.redis = Flapjack.redis end @redis_options = @config.for_redis end - def pidfile - @pidfile ||= case - when !@options[:pidfile].nil? - @options[:pidfile] - when !@config_env['pid_dir'].nil? - File.join(@config_env['pid_dir'], "#{@options[:type]}-receiver.pid") - else - "/var/run/flapjack/#{@options[:type]}-receiver.pid" - end - end - - def logfile - @logfile ||= case - when !@options[:logfile].nil? - @options[:logfile] - when !@config_env['log_dir'].nil? - File.join(@config_env['log_dir'], "#{@options[:type]}-receiver.log") - else - "/var/run/flapjack/#{@options[:type]}-receiver.log" - end - end - def start - if runner(@options[:type]).daemon_running? - puts "#{@options[:type]}-receiver is already running." - else - print "#{@options[:type]}-receiver starting..." - main_umask = nil - if @options[:daemonize] - main_umask = File.umask - else - print "\n" - end - runner(@options[:type]).execute(:daemonize => @options[:daemonize]) do - begin - File.umask(main_umask) if @options[:daemonize] - main(:fifo => @options[:fifo], :type => @options[:type]) - rescue Exception => e - p e.message - puts e.backtrace.join("\n") - end - end - puts " done." + puts "#{@options[:type]}-receiver starting..." + begin + main(:fifo => @options[:fifo], :type => @options[:type]) + rescue Exception => e + p e.message + puts e.backtrace.join("\n") end - end - - def stop - pid = get_pid - if runner(@options[:type]).daemon_running? - print "#{@options[:type]}-receiver stopping..." - runner(@options[:type]).execute(:kill => true) - puts " done." - else - puts "#{@options[:type]}-receiver is not running." - end - exit_now! unless wait_pid_gone(pid) - end - - def restart - print "#{@options[:type]}-receiver restarting..." - main_umask = File.umask - runner(@options[:type]).execute(:daemonize => true, :restart => true) do - begin - File.umask(main_umask) - main(:fifo => @options[:fifo], :type => @options[:type]) - rescue Exception => e - p e.message - puts e.backtrace.join("\n") - end - end puts " done." end - def status - if runner(@options[:type]).daemon_running? - pid = get_pid - uptime = Time.now - File.stat(pidfile).ctime - puts "#{@options[:type]}-receiver is running: pid #{pid}, uptime #{uptime}" - else - exit_now! "#{@options[:type]}-receiver is not running" - end - end - def json json_feeder(:from => @options[:from]) end def mirror @@ -144,25 +69,13 @@ end private def redis - return @redis unless @redis.nil? - @redis = Redis.new(@redis_options.merge(:driver => :hiredis)) - Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => @redis) - Flapjack::Data::Migration.clear_orphaned_entity_ids(:redis => @redis) - @redis + @redis ||= Redis.new(@redis_options) end - def runner(type) - return @runner if @runner - - @runner = Dante::Runner.new("#{@options[:type]}-receiver", :pid_path => pidfile, - :log_path => logfile) - @runner - end - def process_input(opts) config_rec = case opts[:type] when /nagios/ @config_env['nagios-receiver'] || {} when /nsca/ @@ -245,11 +158,11 @@ 'summary' => check_output, 'details' => details, 'perfdata' => check_perfdata, 'time' => timestamp, } - Flapjack::Data::Event.add(event, :redis => redis) + Flapjack::Data::Event.push('events', event) end rescue Redis::CannotConnectError puts "Error, unable to to connect to the redis server (#{$!})" end end @@ -261,103 +174,105 @@ puts "Whoops with the fifo, restarting main loop in 10 seconds" sleep 10 end end - def process_exists(pid) - return unless pid - begin - Process.kill(0, pid) - return true - rescue Errno::ESRCH - return false - end - end - - # wait until the specified pid no longer exists, or until a timeout is reached - def wait_pid_gone(pid, timeout = 30) - print "waiting for a max of #{timeout} seconds for process #{pid} to exit" if process_exists(pid) - started_at = Time.now.to_i - while process_exists(pid) - break unless (Time.now.to_i - started_at < timeout) - print '.' - sleep 1 - end - puts '' - !process_exists(pid) - end - - def get_pid - IO.read(pidfile).chomp.to_i - rescue StandardError - pid = nil - end - - class EventFeedHandler < Oj::ScHandler - - def initialize(&block) - @hash_depth = 0 - @callback = block if block_given? - end - - def hash_start - @hash_depth += 1 - Hash.new - end - - def hash_end - @hash_depth -= 1 - end - - def array_start - Array.new - end - - def array_end - end - - def add_value(value) - @callback.call(value) if @callback - nil - end - - def hash_set(hash, key, value) - hash[key] = value - end - - def array_append(array, value) - array << value - end - - end - def json_feeder(opts = {}) + require 'json/stream' + input = if opts[:from] File.open(opts[:from]) # Explodes if file does not exist. - elsif $stdin.tty? + elsif !'java'.eql?(RUBY_PLATFORM) && STDIN.tty? + # tty check isn't working under JRuby, assume STDIN is OK to use + # https://github.com/jruby/jruby/issues/1332 exit_now! "No file provided, and STDIN is from terminal! Exiting..." else - $stdin + STDIN end # Sit and churn through the input stream until a valid JSON blob has been assembled. # This handles both the case of a process sending a single JSON and then exiting # (eg. cat foo.json | bin/flapjack receiver json) *and* a longer-running process spitting # out events (eg. /usr/bin/slow-event-feed | bin/flapjack receiver json) + # + # @data is a stack, but @stack is used by the Parser class + parser = JSON::Stream::Parser.new do + start_document do + @data = [] + @keys = [] + @result = nil + end - parser = EventFeedHandler.new do |parsed| - # Handle "parsed" (a hash) - errors = Flapjack::Data::Event.validation_errors_for_hash(parsed) - if errors.empty? - Flapjack::Data::Event.add(parsed, :redis => redis) - puts "Enqueued event data, #{parsed.inspect}" - else - puts "Invalid event data received, #{errors.join(', ')} #{parsed.inspect}" + end_document { + # interfering with json-stream's "one object per stream" model, + # but it errors without this + @state = :start_document + } + + start_object do + @data.push({}) end + + end_object do + node = @data.pop + + if @data.size > 0 + top = @data.last + case top + when Hash + top[@keys.pop] = node + when Array + top << node + end + else + errors = Flapjack::Data::Event.validation_errors_for_hash(node) + if errors.empty? + Flapjack::Data::Event.push('events', node) + puts "Enqueued event data, #{node.inspect}" + else + puts "Invalid event data received, #{errors.join(', ')} #{node.inspect}" + end + end + end + + start_array do + @data.push([]) + end + + end_array do + node = @data.pop + if @data.size > 0 + top = @data.last + case top + when Hash + top[@keys.pop] = node + when Array + top << node + end + end + end + + key do |k| + @keys << k + end + + value do |v| + top = @data.last + case top + when Hash + top[@keys.pop] = v + when Array + top << v + else + @data << v + end + end end - Oj.sc_parse(parser, input) + while data = input.read(4096) + parser << data + end puts "Done." end def mirror_receive(opts) @@ -385,11 +300,12 @@ Redis.new(:url => dest_addr, :driver => :hiredis) else exit_now! "could not understand destination Redis config" end - Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => dest_redis) + Flapjack::RedisProxy.config = dest_redis + Zermelo.redis = Flapjack.redis archives = mirror_get_archive_keys_stats(:source => source_redis) raise "found no archives!" if archives.empty? puts "found archives: #{archives.inspect}" @@ -418,15 +334,21 @@ puts archive_key loop do event_json = source_redis.lindex(archive_key, cursor) if event_json - event = Flapjack::Data::Event.parse_and_validate(event_json) - if !event.nil? && (include_re.nil? || + event, errors = Flapjack::Data::Event.parse_and_validate(event_json) + + if !errors.nil? && !errors.empty? + Flapjack.logger.error { + error_str = errors.nil? ? '' : errors.join(', ') + "Invalid event data received, #{error_str} #{event.inspect}" + } + elsif (include_re.nil? || (include_re === "#{event['entity']}:#{event['check']}")) - Flapjack::Data::Event.add(event, :redis => dest_redis) + Flapjack::Data::Event.add(event) events_sent += 1 print "#{events_sent} " if events_sent % 1000 == 0 end cursor -= 1 next @@ -496,177 +418,67 @@ # host_perfdata_file_mode=p # service_perfdata_file_mode=p # Details on the wiki: http://flapjack.io/docs/1.0/usage/USING#configuring-nagios # ' + nagios.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe' - nagios.command :start do |start| - - start.switch [:d, 'daemonize'], :desc => 'Daemonize', - :default_value => true - - start.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - start.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - start.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe' - - start.action do |global_options,options,args| - options.merge!(:type => 'nagios') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.start - end + nagios.action do |global_options,options,args| + options.merge!(:type => 'nagios') + receiver_cli = Flapjack::CLI::Receiver.new(global_options, options) + receiver_cli.start end - - nagios.command :stop do |stop| - - stop.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - stop.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - stop.action do |global_options,options,args| - options.merge!(:type => 'nagios') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.stop - end - end - - nagios.command :restart do |restart| - - restart.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - restart.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - restart.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe' - - restart.action do |global_options,options,args| - options.merge!(:type => 'nagios') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.restart - end - end - - nagios.command :status do |status| - - status.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - status.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - status.action do |global_options,options,args| - options.merge!(:type => 'nagios') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.status - end - end - end receiver.desc 'NSCA receiver' #receiver.arg_name 'Turn Nagios passive check results into Flapjack events' receiver.command :nsca do |nsca| - nsca.command :start do |start| + # # Not sure what to do with this, extra help output: - # # Not sure what to do with this, extra help output: + # Required Nagios Configuration Changes + # ------------------------------------- - # Required Nagios Configuration Changes - # ------------------------------------- + # flapjack-nsca-receiver reads events from the nagios "command file" read from by Nagios, written to by the Nsca-daemon. - # flapjack-nsca-receiver reads events from the nagios "command file" read from by Nagios, written to by the Nsca-daemon. + # The named pipe is automatically created by _nagios_ if it is enabled + # in the configfile: - # The named pipe is automatically created by _nagios_ if it is enabled - # in the configfile: + # # modified lines: + # command_file=/var/lib/nagios3/rw/nagios.cmd - # # modified lines: - # command_file=/var/lib/nagios3/rw/nagios.cmd + # The Nsca daemon is optionally writing to a tempfile if the named pipe does + # not exist. - # The Nsca daemon is optionally writing to a tempfile if the named pipe does - # not exist. + # Details on the wiki: http://flapjack.io/docs/1.0/usage/USING#XXX + # ' + nsca.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe' - # Details on the wiki: http://flapjack.io/docs/1.0/usage/USING#XXX - # ' - - start.switch [:d, 'daemonize'], :desc => 'Daemonize', - :default_value => true - - start.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - start.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - start.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe' - - start.action do |global_options,options,args| - options.merge!(:type => 'nsca') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.start - end + nsca.action do |global_options,options,args| + options.merge!(:type => 'nsca') + cli_receiver = Flapjack::CLI::Receiver.new(global_options, options) + cli_receiver.start end - nsca.command :stop do |stop| - - stop.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - stop.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - stop.action do |global_options,options,args| - options.merge!(:type => 'nsca') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.stop - end - end - - nsca.command :restart do |restart| - - restart.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - restart.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - restart.flag [:f, 'fifo'], :desc => 'PATH of the nagios perfdata named pipe' - - restart.action do |global_options,options,args| - options.merge!(:type => 'nsca') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.restart - end - end - - nsca.command :status do |status| - - status.flag [:p, 'pidfile'], :desc => 'PATH of the pidfile to write to' - - status.flag [:l, 'logfile'], :desc => 'PATH of the logfile to write to' - - status.action do |global_options,options,args| - options.merge!(:type => 'nsca') - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.status - end - end - end receiver.desc 'JSON receiver' receiver.command :json do |json| - json.flag [:f, 'from'], :desc => 'PATH of the file to process [STDIN]' json.action do |global_options,options,args| - receiver = Flapjack::CLI::Receiver.new(global_options, options) - receiver.json + cli_receiver = Flapjack::CLI::Receiver.new(global_options, options) + cli_receiver.json end end receiver.desc 'Mirror receiver' receiver.command :mirror do |mirror| - mirror.flag [:s, 'source'], :desc => 'URL of source redis database, e.g. redis://localhost:6379/0', + mirror.flag [:s, 'source'], :desc => 'URL of source redis database, eg redis://localhost:6379/0', :required => true - mirror.flag [:d, 'dest'], :desc => 'URL of destination redis database, e.g. redis://localhost:6379/1' - - mirror.flag [:i, 'include'], :desc => 'Regexp which must match event id for it to be mirrored' - # one or both of follow, all is required mirror.switch [:f, 'follow'], :desc => 'keep reading events as they are archived on the source', :default_value => nil mirror.switch [:a, 'all'], :desc => 'replay all archived events from the source', @@ -679,11 +491,10 @@ # options.since in code mirror.flag [:t, 'time'], :desc => 'replay all events archived on the source since TIME', :default_value => nil mirror.action do |global_options,options,args| - options.merge!(:type => 'mirror') receiver = Flapjack::CLI::Receiver.new(global_options, options) receiver.mirror end end @@ -694,10 +505,12 @@ oneoff.action do |global_options, options, args| libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path oneoff = libexec.join('oneoff') if oneoff.exist? Kernel.exec(oneoff.to_s, *ARGV) + else + exit_now! "Oneoff event submitter doesn't exist" end end end receiver.desc 'HTTP checker for availability checking of services' @@ -706,10 +519,12 @@ httpchecker.action do |global_options, options, args| libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path httpchecker = libexec.join('httpchecker') if httpchecker.exist? Kernel.exec(httpchecker.to_s, *ARGV) + else + exit_now! "HTTP checker doesn't exist" end end end @@ -719,31 +534,40 @@ httpbroker.action do |global_options, options, args| libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path httpbroker = libexec.join('httpbroker') if httpbroker.exist? Kernel.exec(httpbroker.to_s, *ARGV) + else + exit_now! "HTTP broker doesn't exist" end end end + + receiver.desc 'HTTP API that caches and submits Cloudwatch events' + receiver.command :cloudwatchbroker do |cloudwatchbroker| + cloudwatchbroker.passthrough = true + cloudwatchbroker.action do |global_options, options, args| + libexec = Pathname.new(__FILE__).parent.parent.parent.parent.join('libexec').expand_path + cloudwatchbroker = libexec.join('httpbroker') + if cloudwatchbroker.exist? + Kernel.exec(cloudwatchbroker.to_s, *(ARGV + ['--format=sns'])) + else + exit_now! "HTTP broker doesn't exist" + end + end + end + end # # Nsca example line for a storage-device check: # #[1393410685] PROCESS_SERVICE_CHECK_RESULT;db1.dev;STORAGE;0;Raid Set # 000 (800.0GB) is Normal. # config_nr = config_env['nsca-receiver'] || {} -# pidfile = options.pidfile.nil? ? -# (config_nr['pid_file'] || "/var/run/flapjack/#{exe}.pid") : -# options.pidfile - # logfile = options.logfile.nil? ? # (config_nr['log_file'] || "/var/log/flapjack/#{exe}.log") : # options.logfile # fifo = options.fifo.nil? ? # (config_nr['fifo'] || '/var/lib/nagios3/rw/nagios.cmd') : # options.fifo - -# daemonize = options.daemonize.nil? ? -# !!config_nr['daemonize'] : -# options.daemonize