Sha256: ceefa391781404841c7fdf3289020208d9f53729ede57cdea170f8822e39e786

Contents?: true

Size: 1.14 KB

Versions: 1

Compression:

Stored size: 1.14 KB

Contents

require 'kafka'

module Snoopka
  class Listener

    def initialize(settings = {})
      @settings = settings
      @observers = {}
      @consumers = []
    end

    def settings
      @settings
    end

    def observers
      @observers
    end

    def observer_count
      @observers.count
    end

    def add_observer(topic = '', &proc)
      @observers[topic] ||= []
      @observers[topic] << proc
      @consumers << create_consumer(topic)
    end

    # loop through all observers of this topic and call the associated blocks
    def notify_observers(topic, messages)
      @observers[topic].each do |observer|
        observer.call messages
      end
    end

    # loop through all consumers to read from kafka
    def consume
      @consumers.each do |consumer|
        messages = consumer.consume
        notify_observers(consumer.topic, messages) if messages.length > 0
      end
    end

    # create a kafka consumer for the topic
    def create_consumer(topic)
      Kafka::Consumer.new(
        {
          host: @settings[:host] || 'localhost',
          port: @settings[:port] || 9092,
          topic: topic
        }
      )
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
snoopka-0.0.1 lib/snoopka/listener.rb