Sha256: a61113fbbbb6fc47b3e7ef3135e0959e484cca119a01cad2c3590f1a7d5fce2c
Contents?: true
Size: 1.31 KB
Versions: 4
Compression:
Stored size: 1.31 KB
Contents
class FluQ::Input::Base include FluQ::Mixins::Loggable # @attr_reader [FluQ::Reactor] reactor reference attr_reader :reactor # @attr_reader [Hash] config attr_reader :config # @param [FluQ::Reactor] reactor # @param [Hash] options various configuration options def initialize(reactor, options = {}) super() @reactor = reactor @config = defaults.merge(options) end # @return [String] descriptive name def name @name ||= self.class.name.split("::")[-1].downcase end # Start the input def run end # Creates a new buffer object # @return [FluQ::Buffer::Base] a new buffer def new_buffer buffer_klass.new config[:buffer_options] end # Flushes and closes a buffer # @param [FluQ::Buffer::Base] buffer def flush!(buffer) feed_klass.new(buffer).each_slice(10_000) do |events| reactor.process(events) end rescue => ex logger.crash "#{self.class.name} failure: #{ex.message} (#{ex.class.name})", ex ensure buffer.close if buffer end protected def buffer_klass @buffer_klass ||= FluQ::Buffer.const_get(config[:buffer].to_s.capitalize) end def feed_klass @feed_klass ||= FluQ::Feed.const_get(config[:feed].to_s.capitalize) end def defaults { buffer: "file", feed: "msgpack", buffer_options: {} } end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
fluq-0.7.5 | lib/fluq/input/base.rb |
fluq-0.7.3 | lib/fluq/input/base.rb |
fluq-0.7.1 | lib/fluq/input/base.rb |
fluq-0.7.0 | lib/fluq/input/base.rb |