lib/flapjack/cli/receiver.rb in flapjack-1.2.0rc1 vs lib/flapjack/cli/receiver.rb in flapjack-1.2.0rc2

- old
+ new

@@ -4,10 +4,11 @@ require 'redis' require 'hiredis' require 'flapjack/configuration' require 'flapjack/data/event' +require 'flapjack/data/migration' require 'flapjack/patches' # TODO options should be overridden by similar config file options module Flapjack @@ -17,24 +18,21 @@ def initialize(global_options, options) @global_options = global_options @options = options @config = Flapjack::Configuration.new + @config.load(global_options[:config]) + @config_env = @config.all - if 'mirror'.eql?(@options[:type]) && - (global_options[:config].nil? || global_options[:config].strip.empty?) + if @config_env.nil? || @config_env.empty? + unless 'mirror'.eql?(@options[:type]) + exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'" + end @config_env = {} @config_runner = {} else - @config.load(global_options[:config]) - @config_env = @config.all - - if @config_env.nil? || @config_env.empty? - exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'" - end - @config_runner = @config_env["#{@options[:type]}-receiver"] || {} end @redis_options = @config.for_redis end @@ -133,11 +131,14 @@ end private def redis - @redis ||= Redis.new(@redis_options.merge(:driver => :hiredis)) + return @redis unless @redis.nil? + @redis = Redis.new(@redis_options.merge(:driver => :hiredis)) + Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => @redis) + @redis end def runner(type) return @runner if @runner @@ -361,15 +362,22 @@ source_addr = opts[:source] source_redis = Redis.new(:url => source_addr, :driver => :hiredis) dest_addr = opts[:dest] - dest_redis = Redis.new(:url => dest_addr, :driver => :hiredis) + dest_redis = case dest_addr + when Hash + Redis.new(dest_addr.merge(:driver => :hiredis)) + when String + Redis.new(:url => dest_addr, :driver => :hiredis) + else + exit_now! "could not understand destination Redis config" + end - refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis) - archives = mirror_get_archive_keys_stats(source_addr, :source => source_redis, - :dest => dest_redis) + Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => dest_redis) + + archives = mirror_get_archive_keys_stats(:source => source_redis) raise "found no archives!" if archives.empty? puts "found archives: #{archives.inspect}" # each archive bucket is a redis list that is written @@ -408,21 +416,14 @@ end cursor -= 1 next end - archives = mirror_get_archive_keys_stats(source_addr, - :source => source_redis, :dest => dest_redis) + archives = mirror_get_archive_keys_stats(:source => source_redis).select {|a| + a[:size] > 0 + } - if archives.any? {|a| a[:size] == 0} - # data may be out of date -- refresh, then reject any immediately - # expired keys directly; don't keep chasing updated data - refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis) - archives = mirror_get_archive_keys_stats(source_addr, - :source => source_redis, :dest => dest_redis).select {|a| a[:size] > 0} - end - if archives.empty? sleep 1 next end @@ -437,29 +438,13 @@ sleep 1 end end end - def mirror_get_archive_keys_stats(name, opts = {}) + def mirror_get_archive_keys_stats(opts = {}) source_redis = opts[:source] - dest_redis = opts[:dest] - dest_redis.smembers("known_events_archive_keys:#{name}").sort.collect do |eak| + source_redis.smembers("known_events_archive_keys").sort.collect do |eak| {:name => eak, :size => source_redis.llen(eak)} - end - end - - def refresh_archive_index(name, opts = {}) - source_redis = opts[:source] - dest_redis = opts[:dest] - # refresh the key name cache, avoid repeated calls to redis KEYS - # this cache will be updated any time a new archive bucket is created - archive_keys = source_redis.keys("events_archive:*").group_by do |ak| - (source_redis.llen(ak) > 0) ? 't' : 'f' - end - - {'f' => :srem, 't' => :sadd}.each_pair do |k, cmd| - next unless archive_keys.has_key?(k) && !archive_keys[k].empty? - dest_redis.send(cmd, "known_events_archive_keys:#{name}", archive_keys[k]) end end end end