Sha256: 8c52555693571ae180816d95140564f9864877a48fe64fc2de96d8b294f3e058
Contents?: true
Size: 1.31 KB
Versions: 2
Compression:
Stored size: 1.31 KB
Contents
class FluQ::Input::Base include Celluloid::IO include FluQ::Mixins::Loggable finalizer :before_terminate # @attr_reader [Hash] config attr_reader :config # @attr_reader [FluQ::Worker] worker attr_reader :worker # @param [String] source feed name # @param [Array<Class,multiple>] handlers handler builders # @param [Hash] options various configuration options def initialize(source, handlers, options = {}) super() @config = defaults.merge(options) configure @worker = FluQ::Worker.new_link [source, name].join(":"), handlers logger.info "#{source}: listening to #{description}" async.run end # @return [String] short name def name @name ||= self.class.name.split("::")[-1].downcase end # @return [String] descriptive name def description name end # Start the input def run end # Processes data # @param [String] data def process(data) worker.process format.parse(data) end protected # @abstract callback for configuration initialization def configure end # @abstract callback before termination def before_terminate end def format @format ||= FluQ::Format.const_get(config[:format].to_s.capitalize).new(config[:format_options]) end def defaults { format: "json", format_options: {} } end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluq-0.8.1 | lib/fluq/input/base.rb |
fluq-0.8.0 | lib/fluq/input/base.rb |