lib/flapjack/cli/receiver.rb in flapjack-1.2.0rc1 vs lib/flapjack/cli/receiver.rb in flapjack-1.2.0rc2
- old
+ new
@@ -4,10 +4,11 @@
require 'redis'
require 'hiredis'
require 'flapjack/configuration'
require 'flapjack/data/event'
+require 'flapjack/data/migration'
require 'flapjack/patches'
# TODO options should be overridden by similar config file options
module Flapjack
@@ -17,24 +18,21 @@
def initialize(global_options, options)
@global_options = global_options
@options = options
@config = Flapjack::Configuration.new
+ @config.load(global_options[:config])
+ @config_env = @config.all
- if 'mirror'.eql?(@options[:type]) &&
- (global_options[:config].nil? || global_options[:config].strip.empty?)
+ if @config_env.nil? || @config_env.empty?
+ unless 'mirror'.eql?(@options[:type])
+ exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'"
+ end
@config_env = {}
@config_runner = {}
else
- @config.load(global_options[:config])
- @config_env = @config.all
-
- if @config_env.nil? || @config_env.empty?
- exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'"
- end
-
@config_runner = @config_env["#{@options[:type]}-receiver"] || {}
end
@redis_options = @config.for_redis
end
@@ -133,11 +131,14 @@
end
private
def redis
- @redis ||= Redis.new(@redis_options.merge(:driver => :hiredis))
+ return @redis unless @redis.nil?
+ @redis = Redis.new(@redis_options.merge(:driver => :hiredis))
+ Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => @redis)
+ @redis
end
def runner(type)
return @runner if @runner
@@ -361,15 +362,22 @@
source_addr = opts[:source]
source_redis = Redis.new(:url => source_addr, :driver => :hiredis)
dest_addr = opts[:dest]
- dest_redis = Redis.new(:url => dest_addr, :driver => :hiredis)
+ dest_redis = case dest_addr
+ when Hash
+ Redis.new(dest_addr.merge(:driver => :hiredis))
+ when String
+ Redis.new(:url => dest_addr, :driver => :hiredis)
+ else
+ exit_now! "could not understand destination Redis config"
+ end
- refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis)
- archives = mirror_get_archive_keys_stats(source_addr, :source => source_redis,
- :dest => dest_redis)
+ Flapjack::Data::Migration.migrate_entity_check_data_if_required(:redis => dest_redis)
+
+ archives = mirror_get_archive_keys_stats(:source => source_redis)
raise "found no archives!" if archives.empty?
puts "found archives: #{archives.inspect}"
# each archive bucket is a redis list that is written
@@ -408,21 +416,14 @@
end
cursor -= 1
next
end
- archives = mirror_get_archive_keys_stats(source_addr,
- :source => source_redis, :dest => dest_redis)
+ archives = mirror_get_archive_keys_stats(:source => source_redis).select {|a|
+ a[:size] > 0
+ }
- if archives.any? {|a| a[:size] == 0}
- # data may be out of date -- refresh, then reject any immediately
- # expired keys directly; don't keep chasing updated data
- refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis)
- archives = mirror_get_archive_keys_stats(source_addr,
- :source => source_redis, :dest => dest_redis).select {|a| a[:size] > 0}
- end
-
if archives.empty?
sleep 1
next
end
@@ -437,29 +438,13 @@
sleep 1
end
end
end
- def mirror_get_archive_keys_stats(name, opts = {})
+ def mirror_get_archive_keys_stats(opts = {})
source_redis = opts[:source]
- dest_redis = opts[:dest]
- dest_redis.smembers("known_events_archive_keys:#{name}").sort.collect do |eak|
+ source_redis.smembers("known_events_archive_keys").sort.collect do |eak|
{:name => eak, :size => source_redis.llen(eak)}
- end
- end
-
- def refresh_archive_index(name, opts = {})
- source_redis = opts[:source]
- dest_redis = opts[:dest]
- # refresh the key name cache, avoid repeated calls to redis KEYS
- # this cache will be updated any time a new archive bucket is created
- archive_keys = source_redis.keys("events_archive:*").group_by do |ak|
- (source_redis.llen(ak) > 0) ? 't' : 'f'
- end
-
- {'f' => :srem, 't' => :sadd}.each_pair do |k, cmd|
- next unless archive_keys.has_key?(k) && !archive_keys[k].empty?
- dest_redis.send(cmd, "known_events_archive_keys:#{name}", archive_keys[k])
end
end
end
end