#!/usr/bin/env ruby unless $:.include?(File.dirname(__FILE__) + '/../lib/') $: << File.dirname(__FILE__) + '/../lib' end require 'optparse' require 'ostruct' require 'redis' require 'oj' require 'flapjack/configuration' require 'flapjack/data/event' exe = File.basename(__FILE__) options = OpenStruct.new options.config = Flapjack::Configuration::DEFAULT_CONFIG_PATH optparse = OptionParser.new do |opts| opts.banner = "Usage: #{exe} [OPTIONS]" opts.separator "" opts.separator "Feed JSON blobs from file or STDIN into the Flapjack event queue." opts.separator "" opts.separator "Options" opts.on("-c", "--config [PATH]", String, "PATH to the config file to use.") do |c| options.config = c end opts.on("-f", "--from [FILE]", String, "path to the FILE to process; if not provided, defaults to STDIN.") do |f| options.from = f end end optparse.parse!(ARGV) bail_with_usage = proc do |message| puts message puts "\n#{optparse}" exit(false) end if options.help puts optparse exit elsif options.version puts Flapjack::VERSION exit end FLAPJACK_ENV = ENV['FLAPJACK_ENV'] || 'production' config = Flapjack::Configuration.new config.load(options.config) config_env = config.all if config_env.nil? || config_env.empty? puts "No config data for environment '#{FLAPJACK_ENV}' found in '#{options.config}'" exit(false) end redis = Redis.new(config.for_redis) input = if options.from File.open(options.from) # Explodes if file does not exist. else bail_with_usage.call("No file provided, and STDIN is from terminal! Exiting...") if $stdin.tty? $stdin end # Sit and churn through the input stream until a valid JSON blob has been assembled. # This handles both the case of a process sending a single JSON and then exiting # (eg. cat foo.json | bin/flapjack-feed-event) *and* a longer-running process spitting # out events (eg. /usr/bin/slow-event-feed | bin/flapjack-feed-event) class EventFeedHandler < Oj::ScHandler def initialize(&block) @hash_depth = 0 @callback = block if block_given? end def hash_start @hash_depth += 1 Hash.new end def hash_end @hash_depth -= 1 end def array_start Array.new end def array_end end def add_value(value) @callback.call(value) if @callback nil end def hash_set(hash, key, value) hash[key] = value end def array_append(array, value) array << value end end parser = EventFeedHandler.new do |parsed| # Handle "parsed" (a hash) errors = Flapjack::Data::Event.validation_errors_for_hash(parsed) if errors.empty? Flapjack::Data::Event.add(parsed, :redis => redis) puts "Enqueued event data, #{parsed.inspect}" else puts "Invalid event data received, #{errors.join(', ')} #{parsed.inspect}" end end Oj.sc_parse(parser, input) puts "Done."