Sha256: a61113fbbbb6fc47b3e7ef3135e0959e484cca119a01cad2c3590f1a7d5fce2c

Contents?: true

Size: 1.31 KB

Versions: 4

Compression:

Stored size: 1.31 KB

Contents

class FluQ::Input::Base
  include FluQ::Mixins::Loggable

  # @attr_reader [FluQ::Reactor] reactor reference
  attr_reader :reactor

  # @attr_reader [Hash] config
  attr_reader :config

  # @param [FluQ::Reactor] reactor
  # @param [Hash] options various configuration options
  def initialize(reactor, options = {})
    super()
    @reactor = reactor
    @config  = defaults.merge(options)
  end

  # @return [String] descriptive name
  def name
    @name ||= self.class.name.split("::")[-1].downcase
  end

  # Start the input
  def run
  end

  # Creates a new buffer object
  # @return [FluQ::Buffer::Base] a new buffer
  def new_buffer
    buffer_klass.new config[:buffer_options]
  end

  # Flushes and closes a buffer
  # @param [FluQ::Buffer::Base] buffer
  def flush!(buffer)
    feed_klass.new(buffer).each_slice(10_000) do |events|
      reactor.process(events)
    end
  rescue => ex
    logger.crash "#{self.class.name} failure: #{ex.message} (#{ex.class.name})", ex
  ensure
    buffer.close if buffer
  end

  protected

    def buffer_klass
      @buffer_klass ||= FluQ::Buffer.const_get(config[:buffer].to_s.capitalize)
    end

    def feed_klass
      @feed_klass ||= FluQ::Feed.const_get(config[:feed].to_s.capitalize)
    end

    def defaults
      { buffer: "file", feed: "msgpack", buffer_options: {} }
    end

end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
fluq-0.7.5 lib/fluq/input/base.rb
fluq-0.7.3 lib/fluq/input/base.rb
fluq-0.7.1 lib/fluq/input/base.rb
fluq-0.7.0 lib/fluq/input/base.rb