#!/usr/bin/env ruby unless $:.include?(File.dirname(__FILE__) + '/../lib/') $: << File.dirname(__FILE__) + '/../lib' end require 'optparse' require 'ostruct' require 'redis' require 'oj' Oj.default_options = { :indent => 0, :mode => :strict } require 'flapjack/configuration' require 'flapjack/data/event' def pike(message) puts "piking out: #{message}" exit 1 end def send_event(event, opts) Flapjack::Data::Event.add(event, :redis => opts[:redis]) end def receive(opts) redis = Redis.new(opts[:redis_options]) source_redis = Redis.new(:url => opts[:source]) archives = get_archive_keys_stats(source_redis) raise "found no archives!" unless archives && archives.length > 0 puts "found archives: #{archives.inspect}" # each archive bucket is a redis list that is written # with brpoplpush, that is newest items are added to the left (head) # of the list, so oldest events are to be found at the tail of the list. # # the index of these archives, in the 'archives' array, also stores the # redis key names for each bucket in oldest to newest events_sent = 0 case when opts[:all] archive_key = archives[0][:name] cursor = -1 when opts[:last], opts[:time] raise "Sorry, unimplemented" else # wait for the next event to be archived, so point the cursor at a non-existant # slot in the list, the one before the 0'th archive_key = archives[-1][:name] cursor = -1 - archives[-1][:size] end puts archive_key loop do new_archive_key = false # something to read at cursor? event = source_redis.lindex(archive_key, cursor) if event send_event(event, :redis => redis) events_sent += 1 print "#{events_sent} " if events_sent % 1000 == 0 cursor -= 1 else puts "\narchive key: #{archive_key}, cursor: #{cursor}" # do we need to look at the next archive bucket? archives = get_archive_keys_stats(source_redis) i = archives.index {|a| a[:name] == archive_key } if archives[i][:size] = (cursor.abs + 1) if archives[i + 1] archive_key = archives[i + 1][:name] puts archive_key cursor = -1 new_archive_key = true else return unless opts[:follow] end end sleep 1 unless new_archive_key end end end def get_archive_keys_stats(source_redis) source_redis.keys("events_archive:*").sort.map {|a| { :name => a, :size => source_redis.llen(a) } } end options = OpenStruct.new options.config = nil options.daemonize = nil exe = File.basename(__FILE__) optparse = OptionParser.new do |opts| opts.banner = "Usage: #{exe} COMMAND [OPTIONS]" opts.separator "" opts.separator "Commands" opts.separator " help" opts.separator "" opts.separator "Options" opts.on("-c", "--config [PATH]", String, "PATH to the config file to use") do |c| options.config = c end opts.on("-s", "--source URL", String, "URL of source redis database, eg redis://localhost:6379/0") do |s| options.source = s end opts.on("-f", "--follow", String, "keep reading events as they are archived on the source") do |f| options.follow = true end opts.on("-a", "--all", String, "replay all archived events from the source") do |a| options.all = true end opts.on("-l", "--last COUNT", String, "replay the last COUNT events from the source") do |l| options.count = l end opts.on("-t", "--time TIME", String, "replay all events archived on the source since TIME") do |t| options.since = t end end optparse.parse!(ARGV) FLAPJACK_ENV = ENV['FLAPJACK_ENV'] || 'development' if options.config config_file = options.config pike "specified config file cannot be read" unless File.readable?(config_file) else [ 'etc/flapjack_config.yaml', File.dirname(__FILE__) + 'etc/flapjack_config.yaml', '/etc/flapjack/flapjack_config.yaml' ].each do |candidate| if File.readable?(candidate) config_file = candidate break else puts "not found and/or not readable: #{candidate}" end end pike "no config file specified and none found in default locations" unless config_file end config = Flapjack::Configuration.new config.load(config_file) config_env = config.all redis_options = config.for_redis if config_env.nil? || config_env.empty? puts "No config data for environment '#{FLAPJACK_ENV}' found in '#{options.config}'" exit(false) end unless options.source puts "--source URL is required" exit 1 end unless options.follow || options.all puts "one or both of --follow or --all is required" exit 1 end case ARGV[0] when "help" puts optparse exit else unless ARGV.nil? || ARGV.empty? puts "Unknown command provided: '#{ARGV[0]}'" puts "\n#{optparse}" exit 1 end end receive(:follow => options.follow, :all => options.all, :source => options.source, :last => options.last, :time => options.time, :redis_options => redis_options)