Sha256: 975969a96f8a7e09b7e6988ad5433084d92b4fa7e28476ec186e6a278029981d

Contents?: true

Size: 807 Bytes

Versions: 2

Compression:

Stored size: 807 Bytes

Contents

require "java"
require "jruby-kafka/namespace"

java_import 'kafka.consumer.ConsumerIterator'
java_import 'kafka.consumer.KafkaStream'
java_import 'kafka.common.ConsumerRebalanceFailedException'

class Kafka::Consumer
  include Java::JavaLang::Runnable
  java_signature 'void run()'

  @m_stream
  @m_threadNumber
  @m_queue

    def initialize(a_stream, a_threadNumber, a_queue)
      @m_threadNumber = a_threadNumber
      @m_stream = a_stream
      @m_queue = a_queue
    end

    def run
      it = @m_stream.iterator()
      while it.hasNext()
        begin
          begin
            @m_queue << it.next().message()
          rescue ConsumerRebalanceFailedException => e
            raise KafkaError.new(e), "Got ConsumerRebalanceFailedException: #{e}"
          end
        end
      end
    end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
jruby-kafka-0.0.5 lib/jruby-kafka/consumer.rb
jruby-kafka-0.0.4 lib/jruby-kafka/consumer.rb