Sha256: 8c52555693571ae180816d95140564f9864877a48fe64fc2de96d8b294f3e058

Contents?: true

Size: 1.31 KB

Versions: 2

Compression:

Stored size: 1.31 KB

Contents

class FluQ::Input::Base
  include Celluloid::IO
  include FluQ::Mixins::Loggable
  finalizer :before_terminate

  # @attr_reader [Hash] config
  attr_reader :config

  # @attr_reader [FluQ::Worker] worker
  attr_reader :worker

  # @param [String] source feed name
  # @param [Array<Class,multiple>] handlers handler builders
  # @param [Hash] options various configuration options
  def initialize(source, handlers, options = {})
    super()
    @config = defaults.merge(options)
    configure

    @worker = FluQ::Worker.new_link [source, name].join(":"), handlers
    logger.info "#{source}: listening to #{description}"
    async.run
  end

  # @return [String] short name
  def name
    @name ||= self.class.name.split("::")[-1].downcase
  end

  # @return [String] descriptive name
  def description
    name
  end

  # Start the input
  def run
  end

  # Processes data
  # @param [String] data
  def process(data)
    worker.process format.parse(data)
  end

  protected

    # @abstract callback for configuration initialization
    def configure
    end

    # @abstract callback before termination
    def before_terminate
    end

    def format
      @format ||= FluQ::Format.const_get(config[:format].to_s.capitalize).new(config[:format_options])
    end

    def defaults
      { format: "json", format_options: {} }
    end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluq-0.8.1 lib/fluq/input/base.rb
fluq-0.8.0 lib/fluq/input/base.rb