#!/usr/bin/env ruby require 'json' require 'thread' require 'logger' require 'bunny' require 'trollop' require 'franz' # If we find a local config, try to use it # N.B. We'll fall back to the last guy listed here config = nil %w[ config.json %{HOME}/.franz.json /etc/franz/franz.json ].each do |path| config = path % ENV rescue next break if File.exist? config end # Franz really only accepts a config file as an option, and that config file # has got to conform to a certain format. If you're unsure, just look at the # default options hashes in both Franz::Input and Franz::Output opts = Trollop::options(ARGV) do version Franz::VERSION banner Franz::ART + "\n\n" + <<-EOS.gsub(/^ /, '') #{Franz::SUMMARY} Usage: franz [] Options: EOS opt :config, 'Configuration file to use', type: :string, default: config opt :debug, 'Enable debugging output', default: false opt :trace, 'Enable trace output', default: false opt :log, 'Log to file, not STDOUT', type: :string, default: nil end Thread.abort_on_exception = true # Die quickly and with great ceremony config = Franz::Config.new opts[:config] logger = Franz::Logger.new opts[:debug], opts[:trace], opts[:log] io_bound = config[:output][:bound] || 10_000 io = SizedQueue.new io_bound # Now we'll connect to our output, RabbitMQ. This creates a new thread in the # background, which will consume the events generated by our input on io Franz::Output.new \ input: io, output: config[:output][:rabbitmq], logger: logger, tags: config[:output][:tags] # Franz has only one kind of input, plain text files. Franz::Input.new \ input: config[:input], output: io, logger: logger, checkpoint: config[:checkpoint], checkpoint_interval: config[:checkpoint_interval] # Ensure memory doesn't grow too large (> 1GB by default) def mem_kb ; `ps -o rss= -p #{$$}`.strip.to_i ; end mem_limit = config[:memory_limit] || 1_000_000 mem_sleep = config[:memory_limit_interval] || 60 loop do sleep mem_sleep mem_used = mem_kb if mem_used > mem_limit logger.fatal \ event: 'killed', reason: 'Consuming too much memory', used: mem_used, limit: mem_limit exit(1) end logger.debug \ event: 'memcheck', used: mem_used, limit: mem_limit end