Sha256: ceefa391781404841c7fdf3289020208d9f53729ede57cdea170f8822e39e786
Contents?: true
Size: 1.14 KB
Versions: 1
Compression:
Stored size: 1.14 KB
Contents
require 'kafka' module Snoopka class Listener def initialize(settings = {}) @settings = settings @observers = {} @consumers = [] end def settings @settings end def observers @observers end def observer_count @observers.count end def add_observer(topic = '', &proc) @observers[topic] ||= [] @observers[topic] << proc @consumers << create_consumer(topic) end # loop through all observers of this topic and call the associated blocks def notify_observers(topic, messages) @observers[topic].each do |observer| observer.call messages end end # loop through all consumers to read from kafka def consume @consumers.each do |consumer| messages = consumer.consume notify_observers(consumer.topic, messages) if messages.length > 0 end end # create a kafka consumer for the topic def create_consumer(topic) Kafka::Consumer.new( { host: @settings[:host] || 'localhost', port: @settings[:port] || 9092, topic: topic } ) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
snoopka-0.0.1 | lib/snoopka/listener.rb |