Sha256: 9e8db563ffd2dc446591b87389fcfc6d29b1e7182e2aa56cfdbe24323e2c4341

Contents?: true

Size: 1.05 KB

Versions: 10

Compression:

Stored size: 1.05 KB

Contents

require 'java'
require 'jruby-kafka/namespace'

# noinspection JRubyStringImportInspection
class Kafka::Consumer
  java_import 'kafka.consumer.ConsumerIterator'
  java_import 'kafka.consumer.KafkaStream'
  java_import 'kafka.common.ConsumerRebalanceFailedException'
  java_import 'kafka.consumer.ConsumerTimeoutException'

  include Java::JavaLang::Runnable
  java_signature 'void run()'

  def initialize(a_stream, a_thread_number, a_queue, restart_on_exception, a_sleep_ms)
    @m_thread_number = a_thread_number
    @m_stream = a_stream
    @m_queue = a_queue
    @m_restart_on_exception = restart_on_exception
    @m_sleep_ms = 1.0 / 1000.0 * Float(a_sleep_ms)
  end

  def run
    it = @m_stream.iterator
    begin
      while it.hasNext
        begin
          @m_queue << it.next
        end
      end
    rescue Exception => e
      puts("#{self.class.name} caught exception: #{e.class.name}")
      puts(e.message) if e.message != ''
      if @m_restart_on_exception
        sleep(@m_sleep_ms)
        retry
      else
        raise e
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 2 rubygems

Version Path
jruby-kafka-1.5.0-java lib/jruby-kafka/consumer.rb
jruby-kafka-lockjar-1.4.0.pre-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.4.0-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.3.0-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.2.0-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.1.2-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.1.2.beta-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.1.1-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.1.0-java lib/jruby-kafka/consumer.rb
jruby-kafka-1.1.0.beta-java lib/jruby-kafka/consumer.rb