lib/flapjack/cli/receiver.rb in flapjack-1.1.0 vs lib/flapjack/cli/receiver.rb in flapjack-1.2.0rc1
- old
+ new
@@ -1,13 +1,11 @@
#!/usr/bin/env ruby
require 'dante'
require 'redis'
+require 'hiredis'
-require 'oj'
-Oj.default_options = { :indent => 0, :mode => :strict }
-
require 'flapjack/configuration'
require 'flapjack/data/event'
require 'flapjack/patches'
# TODO options should be overridden by similar config file options
@@ -19,38 +17,50 @@
def initialize(global_options, options)
@global_options = global_options
@options = options
@config = Flapjack::Configuration.new
- @config.load(global_options[:config])
- @config_env = @config.all
- if @config_env.nil? || @config_env.empty?
- exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'"
+ if 'mirror'.eql?(@options[:type]) &&
+ (global_options[:config].nil? || global_options[:config].strip.empty?)
+
+ @config_env = {}
+ @config_runner = {}
+ else
+ @config.load(global_options[:config])
+ @config_env = @config.all
+
+ if @config_env.nil? || @config_env.empty?
+ exit_now! "No config data for environment '#{FLAPJACK_ENV}' found in '#{global_options[:config]}'"
+ end
+
+ @config_runner = @config_env["#{@options[:type]}-receiver"] || {}
end
- @config_runner = @config_env["#{@options[:type]}-receiver"] || {}
+ @redis_options = @config.for_redis
+ end
- @pidfile = case
+ def pidfile
+ @pidfile ||= case
when !@options[:pidfile].nil?
@options[:pidfile]
when !@config_env['pid_dir'].nil?
File.join(@config_env['pid_dir'], "#{@options[:type]}-receiver.pid")
else
"/var/run/flapjack/#{@options[:type]}-receiver.pid"
end
+ end
- @logfile = case
+ def logfile
+ @logfile ||= case
when !@options[:logfile].nil?
@options[:logfile]
when !@config_env['log_dir'].nil?
File.join(@config_env['log_dir'], "#{@options[:type]}-receiver.log")
else
"/var/run/flapjack/#{@options[:type]}-receiver.log"
end
-
- @redis_options = @config.for_redis
end
def start
if runner(@options[:type]).daemon_running?
puts "#{@options[:type]}-receiver is already running."
@@ -95,11 +105,11 @@
end
def status
if runner(@options[:type]).daemon_running?
pid = get_pid
- uptime = Time.now - File.stat(@pidfile).ctime
+ uptime = Time.now - File.stat(pidfile).ctime
puts "#{@options[:type]}-receiver is running: pid #{pid}, uptime #{uptime}"
else
exit_now! "#{@options[:type]}-receiver is not running"
end
end
@@ -107,26 +117,34 @@
def json
json_feeder(:from => @options[:from])
end
def mirror
+ if (@options[:dest].nil? || @options[:dest].strip.empty?) &&
+ @redis_options.nil?
+
+ exit_now! "No destination redis URL passed, and none configured"
+ end
+
mirror_receive(:source => @options[:source],
- :all => @options[:all], :follow => @options[:follow],
- :last => @options[:last], :time => @options[:time])
+ :dest => @options[:dest] || @redis_options,
+ :include => @options[:include], :all => @options[:all],
+ :follow => @options[:follow], :last => @options[:last],
+ :time => @options[:time])
end
private
def redis
- @redis ||= Redis.new(@redis_options)
+ @redis ||= Redis.new(@redis_options.merge(:driver => :hiredis))
end
def runner(type)
return @runner if @runner
- @runner = Dante::Runner.new("#{@options[:type]}-receiver", :pid_path => @pidfile,
- :log_path => @logfile)
+ @runner = Dante::Runner.new("#{@options[:type]}-receiver", :pid_path => pidfile,
+ :log_path => logfile)
@runner
end
def process_input(opts)
config_rec = case opts[:type]
@@ -252,11 +270,11 @@
puts ''
!process_exists(pid)
end
def get_pid
- IO.read(@pidfile).chomp.to_i
+ IO.read(pidfile).chomp.to_i
rescue StandardError
pid = nil
end
class EventFeedHandler < Oj::ScHandler
@@ -296,11 +314,10 @@
end
end
def json_feeder(opts = {})
-
input = if opts[:from]
File.open(opts[:from]) # Explodes if file does not exist.
elsif $stdin.tty?
exit_now! "No file provided, and STDIN is from terminal! Exiting..."
else
@@ -326,21 +343,35 @@
Oj.sc_parse(parser, input)
puts "Done."
end
-
def mirror_receive(opts)
unless opts[:follow] || opts[:all]
exit_now! "one or both of --follow or --all is required"
end
- source_redis = Redis.new(:url => opts[:source])
+ include_re = nil
+ unless opts[:include].nil? || opts[:include].strip.empty?
+ begin
+ include_re = Regexp.new(opts[:include].strip)
+ rescue RegexpError
+ exit_now! "could not parse include Regexp: #{opts[:include].strip}"
+ end
+ end
- archives = mirror_get_archive_keys_stats(source_redis)
- raise "found no archives!" unless archives && archives.length > 0
+ source_addr = opts[:source]
+ source_redis = Redis.new(:url => source_addr, :driver => :hiredis)
+ dest_addr = opts[:dest]
+ dest_redis = Redis.new(:url => dest_addr, :driver => :hiredis)
+
+ refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis)
+ archives = mirror_get_archive_keys_stats(source_addr, :source => source_redis,
+ :dest => dest_redis)
+ raise "found no archives!" if archives.empty?
+
puts "found archives: #{archives.inspect}"
# each archive bucket is a redis list that is written
# with brpoplpush, that is newest items are added to the left (head)
# of the list, so oldest events are to be found at the tail of the list.
@@ -348,59 +379,91 @@
# the index of these archives, in the 'archives' array, also stores the
# redis key names for each bucket in oldest to newest
events_sent = 0
case
when opts[:all]
- archive_key = archives[0][:name]
+ archive_idx = 0
cursor = -1
when opts[:last], opts[:time]
raise "Sorry, unimplemented"
else
# wait for the next event to be archived, so point the cursor at a non-existant
# slot in the list, the one before the 0'th
- archive_key = archives[-1][:name]
+ archive_idx = archives.size - 1
cursor = -1 - archives[-1][:size]
end
+ archive_key = archives[archive_idx][:name]
puts archive_key
loop do
- new_archive_key = false
- # something to read at cursor?
- event = source_redis.lindex(archive_key, cursor)
- if event
- Flapjack::Data::Event.add(event, :redis => redis)
- events_sent += 1
- print "#{events_sent} " if events_sent % 1000 == 0
+ event_json = source_redis.lindex(archive_key, cursor)
+ if event_json
+ event = Flapjack::Data::Event.parse_and_validate(event_json)
+ if !event.nil? && (include_re.nil? ||
+ (include_re === "#{event['entity']}:#{event['check']}"))
+
+ Flapjack::Data::Event.add(event, :redis => dest_redis)
+ events_sent += 1
+ print "#{events_sent} " if events_sent % 1000 == 0
+ end
cursor -= 1
+ next
+ end
+
+ archives = mirror_get_archive_keys_stats(source_addr,
+ :source => source_redis, :dest => dest_redis)
+
+ if archives.any? {|a| a[:size] == 0}
+ # data may be out of date -- refresh, then reject any immediately
+ # expired keys directly; don't keep chasing updated data
+ refresh_archive_index(source_addr, :source => source_redis, :dest => dest_redis)
+ archives = mirror_get_archive_keys_stats(source_addr,
+ :source => source_redis, :dest => dest_redis).select {|a| a[:size] > 0}
+ end
+
+ if archives.empty?
+ sleep 1
+ next
+ end
+
+ archive_idx = archives.index {|a| a[:name] == archive_key }
+ archive_idx = archive_idx.nil? ? 0 : (archive_idx + 1)
+ if archives[archive_idx]
+ archive_key = archives[archive_idx][:name]
+ puts archive_key
+ cursor = -1
else
- puts "\narchive key: #{archive_key}, cursor: #{cursor}"
- # do we need to look at the next archive bucket?
- archives = mirror_get_archive_keys_stats(source_redis)
- i = archives.index {|a| a[:name] == archive_key }
- if archives[i][:size] = (cursor.abs + 1)
- if archives[i + 1]
- archive_key = archives[i + 1][:name]
- puts archive_key
- cursor = -1
- new_archive_key = true
- else
- return unless opts[:follow]
- end
- end
- sleep 1 unless new_archive_key
+ break unless opts[:follow]
+ sleep 1
end
end
end
- def mirror_get_archive_keys_stats(source_redis)
- source_redis.keys("events_archive:*").sort.map {|a|
- { :name => a,
- :size => source_redis.llen(a) }
- }
+ def mirror_get_archive_keys_stats(name, opts = {})
+ source_redis = opts[:source]
+ dest_redis = opts[:dest]
+ dest_redis.smembers("known_events_archive_keys:#{name}").sort.collect do |eak|
+ {:name => eak, :size => source_redis.llen(eak)}
+ end
end
+ def refresh_archive_index(name, opts = {})
+ source_redis = opts[:source]
+ dest_redis = opts[:dest]
+ # refresh the key name cache, avoid repeated calls to redis KEYS
+ # this cache will be updated any time a new archive bucket is created
+ archive_keys = source_redis.keys("events_archive:*").group_by do |ak|
+ (source_redis.llen(ak) > 0) ? 't' : 'f'
+ end
+
+ {'f' => :srem, 't' => :sadd}.each_pair do |k, cmd|
+ next unless archive_keys.has_key?(k) && !archive_keys[k].empty?
+ dest_redis.send(cmd, "known_events_archive_keys:#{name}", archive_keys[k])
+ end
+ end
+
end
end
end
desc 'Receive events from external systems and sends them to Flapjack'
@@ -594,13 +657,17 @@
end
receiver.desc 'Mirror receiver'
receiver.command :mirror do |mirror|
- mirror.flag [:s, 'source'], :desc => 'URL of source redis database, eg redis://localhost:6379/0',
+ mirror.flag [:s, 'source'], :desc => 'URL of source redis database, e.g. redis://localhost:6379/0',
:required => true
+ mirror.flag [:d, 'dest'], :desc => 'URL of destination redis database, e.g. redis://localhost:6379/1'
+
+ mirror.flag [:i, 'include'], :desc => 'Regexp which must match event id for it to be mirrored'
+
# one or both of follow, all is required
mirror.switch [:f, 'follow'], :desc => 'keep reading events as they are archived on the source',
:default_value => nil
mirror.switch [:a, 'all'], :desc => 'replay all archived events from the source',
@@ -613,9 +680,10 @@
# options.since in code
mirror.flag [:t, 'time'], :desc => 'replay all events archived on the source since TIME',
:default_value => nil
mirror.action do |global_options,options,args|
+ options.merge!(:type => 'mirror')
receiver = Flapjack::CLI::Receiver.new(global_options, options)
receiver.mirror
end
end