lib/flapjack/cli/receiver.rb in flapjack-1.1.0 vs lib/flapjack/cli/receiver.rb in flapjack-1.2.0rc1

- old
+ new

@@ -1,13 +1,11 @@ #!/usr/bin/env ruby require 'dante' require 'redis' +require 'hiredis' -require 'oj' -Oj.default_options = { :indent => 0, :mode => :strict } - require 'flapjack/configuration' require 'flapjack/data/event' require 'flapjack/patches' # TODO options should be overridden by similar config file options @@ -19,38 +17,50 @@ def initialize(global_options, options) @global_options = global_options @options = options @config = Flapjack::Configuration.new - @config.load(global_options[:config]) - @config_env = @config.all - if @config_env.nil? || @config_env.empty? - exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'" + if 'mirror'.eql?(@options[:type]) && + (global_options[:config].nil? || global_options[:config].strip.empty?) + + @config_env = {} + @config_runner = {} + else + @config.load(global_options[:config]) + @config_env = @config.all + + if @config_env.nil? || @config_env.empty? + exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'" + end + + @config_runner = @config_env["#{@options[:type]}-receiver"] || {} end - @config_runner = @config_env["#{@options[:type]}-receiver"] || {} + @redis_options = @config.for_redis + end - @pidfile = case + def pidfile + @pidfile ||= case when !@options[:pidfile].nil? @options[:pidfile] when !@config_env['pid_dir'].nil? File.join(@config_env['pid_dir'], "#{@options[:type]}-receiver.pid") else "/var/run/flapjack/#{@options[:type]}-receiver.pid" end + end - @logfile = case + def logfile + @logfile ||= case when !@options[:logfile].nil? @options[:logfile] when !@config_env['log_dir'].nil? File.join(@config_env['log_dir'], "#{@options[:type]}-receiver.log") else "/var/run/flapjack/#{@options[:type]}-receiver.log" end - - @redis_options = @config.for_redis end def start if runner(@options[:type]).daemon_running? puts "#{@options[:type]}-receiver is already running." @@ -95,11 +105,11 @@ end def status if runner(@options[:type]).daemon_running? pid = get_pid - uptime = Time.now - File.stat(@pidfile).ctime + uptime = Time.now - File.stat(pidfile).ctime puts "#{@options[:type]}-receiver is running: pid #{pid}, uptime #{uptime}" else exit_now! "#{@options[:type]}-receiver is not running" end end @@ -107,26 +117,34 @@ def json json_feeder(:from => @options[:from]) end def mirror + if (@options[:dest].nil? || @options[:dest].strip.empty?) && + @redis_options.nil? + + exit_now! "No destination redis URL passed, and none configured" + end + mirror_receive(:source => @options[:source], - :all => @options[:all], :follow => @options[:follow], - :last => @options[:last], :time => @options[:time]) + :dest => @options[:dest] || @redis_options, + :include => @options[:include], :all => @options[:all], + :follow => @options[:follow], :last => @options[:last], + :time => @options[:time]) end private def redis - @redis ||= Redis.new(@redis_options) + @redis ||= Redis.new(@redis_options.merge(:driver => :hiredis)) end def runner(type) return @runner if @runner - @runner = Dante::Runner.new("#{@options[:type]}-receiver", :pid_path => @pidfile, - :log_path => @logfile) + @runner = Dante::Runner.new("#{@options[:type]}-receiver", :pid_path => pidfile, + :log_path => logfile) @runner end def process_input(opts) config_rec = case opts[:type] @@ -252,11 +270,11 @@ puts '' !process_exists(pid) end def get_pid - IO.read(@pidfile).chomp.to_i + IO.read(pidfile).chomp.to_i rescue StandardError pid = nil end class EventFeedHandler < Oj::ScHandler @@ -296,11 +314,10 @@ end end def json_feeder(opts = {}) - input = if opts[:from] File.open(opts[:from]) # Explodes if file does not exist. elsif $stdin.tty? exit_now! "No file provided, and STDIN is from terminal! Exiting..." else @@ -326,21 +343,35 @@ Oj.sc_parse(parser, input) puts "Done." end - def mirror_receive(opts) unless opts[:follow] || opts[:all] exit_now! "one or both of --follow or --all is required" end - source_redis = Redis.new(:url => opts[:source]) + include_re = nil + unless opts[:include].nil? || opts[:include].strip.empty? + begin + include_re = Regexp.new(opts[:include].strip) + rescue RegexpError + exit_now! "could not parse include Regexp: #{opts[:include].strip}" + end + end - archives = mirror_get_archive_keys_stats(source_redis) - raise "found no archives!" unless archives && archives.length > 0 + source_addr = opts[:source] + source_redis = Redis.new(:url => source_addr, :driver => :hiredis) + dest_addr = opts[:dest] + dest_redis = Redis.new(:url => dest_addr, :driver => :hiredis) + + refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis) + archives = mirror_get_archive_keys_stats(source_addr, :source => source_redis, + :dest => dest_redis) + raise "found no archives!" if archives.empty? + puts "found archives: #{archives.inspect}" # each archive bucket is a redis list that is written # with brpoplpush, that is newest items are added to the left (head) # of the list, so oldest events are to be found at the tail of the list. @@ -348,59 +379,91 @@ # the index of these archives, in the 'archives' array, also stores the # redis key names for each bucket in oldest to newest events_sent = 0 case when opts[:all] - archive_key = archives[0][:name] + archive_idx = 0 cursor = -1 when opts[:last], opts[:time] raise "Sorry, unimplemented" else # wait for the next event to be archived, so point the cursor at a non-existant # slot in the list, the one before the 0'th - archive_key = archives[-1][:name] + archive_idx = archives.size - 1 cursor = -1 - archives[-1][:size] end + archive_key = archives[archive_idx][:name] puts archive_key loop do - new_archive_key = false - # something to read at cursor? - event = source_redis.lindex(archive_key, cursor) - if event - Flapjack::Data::Event.add(event, :redis => redis) - events_sent += 1 - print "#{events_sent} " if events_sent % 1000 == 0 + event_json = source_redis.lindex(archive_key, cursor) + if event_json + event = Flapjack::Data::Event.parse_and_validate(event_json) + if !event.nil? && (include_re.nil? || + (include_re === "#{event['entity']}:#{event['check']}")) + + Flapjack::Data::Event.add(event, :redis => dest_redis) + events_sent += 1 + print "#{events_sent} " if events_sent % 1000 == 0 + end cursor -= 1 + next + end + + archives = mirror_get_archive_keys_stats(source_addr, + :source => source_redis, :dest => dest_redis) + + if archives.any? {|a| a[:size] == 0} + # data may be out of date -- refresh, then reject any immediately + # expired keys directly; don't keep chasing updated data + refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis) + archives = mirror_get_archive_keys_stats(source_addr, + :source => source_redis, :dest => dest_redis).select {|a| a[:size] > 0} + end + + if archives.empty? + sleep 1 + next + end + + archive_idx = archives.index {|a| a[:name] == archive_key } + archive_idx = archive_idx.nil? ? 0 : (archive_idx + 1) + if archives[archive_idx] + archive_key = archives[archive_idx][:name] + puts archive_key + cursor = -1 else - puts "\narchive key: #{archive_key}, cursor: #{cursor}" - # do we need to look at the next archive bucket? - archives = mirror_get_archive_keys_stats(source_redis) - i = archives.index {|a| a[:name] == archive_key } - if archives[i][:size] = (cursor.abs + 1) - if archives[i + 1] - archive_key = archives[i + 1][:name] - puts archive_key - cursor = -1 - new_archive_key = true - else - return unless opts[:follow] - end - end - sleep 1 unless new_archive_key + break unless opts[:follow] + sleep 1 end end end - def mirror_get_archive_keys_stats(source_redis) - source_redis.keys("events_archive:*").sort.map {|a| - { :name => a, - :size => source_redis.llen(a) } - } + def mirror_get_archive_keys_stats(name, opts = {}) + source_redis = opts[:source] + dest_redis = opts[:dest] + dest_redis.smembers("known_events_archive_keys:#{name}").sort.collect do |eak| + {:name => eak, :size => source_redis.llen(eak)} + end end + def refresh_archive_index(name, opts = {}) + source_redis = opts[:source] + dest_redis = opts[:dest] + # refresh the key name cache, avoid repeated calls to redis KEYS + # this cache will be updated any time a new archive bucket is created + archive_keys = source_redis.keys("events_archive:*").group_by do |ak| + (source_redis.llen(ak) > 0) ? 't' : 'f' + end + + {'f' => :srem, 't' => :sadd}.each_pair do |k, cmd| + next unless archive_keys.has_key?(k) && !archive_keys[k].empty? + dest_redis.send(cmd, "known_events_archive_keys:#{name}", archive_keys[k]) + end + end + end end end desc 'Receive events from external systems and sends them to Flapjack' @@ -594,13 +657,17 @@ end receiver.desc 'Mirror receiver' receiver.command :mirror do |mirror| - mirror.flag [:s, 'source'], :desc => 'URL of source redis database, eg redis://localhost:6379/0', + mirror.flag [:s, 'source'], :desc => 'URL of source redis database, e.g. redis://localhost:6379/0', :required => true + mirror.flag [:d, 'dest'], :desc => 'URL of destination redis database, e.g. redis://localhost:6379/1' + + mirror.flag [:i, 'include'], :desc => 'Regexp which must match event id for it to be mirrored' + # one or both of follow, all is required mirror.switch [:f, 'follow'], :desc => 'keep reading events as they are archived on the source', :default_value => nil mirror.switch [:a, 'all'], :desc => 'replay all archived events from the source', @@ -613,9 +680,10 @@ # options.since in code mirror.flag [:t, 'time'], :desc => 'replay all events archived on the source since TIME', :default_value => nil mirror.action do |global_options,options,args| + options.merge!(:type => 'mirror') receiver = Flapjack::CLI::Receiver.new(global_options, options) receiver.mirror end end