Sha256: 975969a96f8a7e09b7e6988ad5433084d92b4fa7e28476ec186e6a278029981d
Contents?: true
Size: 807 Bytes
Versions: 2
Compression:
Stored size: 807 Bytes
Contents
require "java" require "jruby-kafka/namespace" java_import 'kafka.consumer.ConsumerIterator' java_import 'kafka.consumer.KafkaStream' java_import 'kafka.common.ConsumerRebalanceFailedException' class Kafka::Consumer include Java::JavaLang::Runnable java_signature 'void run()' @m_stream @m_threadNumber @m_queue def initialize(a_stream, a_threadNumber, a_queue) @m_threadNumber = a_threadNumber @m_stream = a_stream @m_queue = a_queue end def run it = @m_stream.iterator() while it.hasNext() begin begin @m_queue << it.next().message() rescue ConsumerRebalanceFailedException => e raise KafkaError.new(e), "Got ConsumerRebalanceFailedException: #{e}" end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
jruby-kafka-0.0.5 | lib/jruby-kafka/consumer.rb |
jruby-kafka-0.0.4 | lib/jruby-kafka/consumer.rb |