Sha256: 7068acebd08479bedc40aa78dcf257f72940dc2e5e9d31758891d5d525724d0e

Contents?: true

Size: 1.66 KB

Versions: 9

Compression:

Stored size: 1.66 KB

Contents

module Rdkafka
  class Consumer
    include Enumerable

    def initialize(native_kafka)
      @native_kafka = native_kafka
    end

    def subscribe(*topics)
      # Create topic partition list with topics and no partition set
      tpl = Rdkafka::FFI.rd_kafka_topic_partition_list_new(topics.length)
      topics.each do |topic|
        Rdkafka::FFI.rd_kafka_topic_partition_list_add(
          tpl,
          topic,
          -1
        )
      end
      # Subscribe to topic partition list and check this was successful
      response = Rdkafka::FFI.rd_kafka_subscribe(@native_kafka, tpl)
      if response != 0
        raise Rdkafka::RdkafkaError.new(response)
      end
    ensure
      # Clean up the topic partition list
      Rdkafka::FFI.rd_kafka_topic_partition_list_destroy(tpl)
    end

    def commit(async=false)
      response = Rdkafka::FFI.rd_kafka_commit(@native_kafka, nil, async)
      if response != 0
        raise Rdkafka::RdkafkaError.new(response)
      end
    end

    def poll(timeout_ms=100)
      message_ptr = Rdkafka::FFI.rd_kafka_consumer_poll(@native_kafka, timeout_ms)
      if message_ptr.null?
        nil
      else
        message = Rdkafka::FFI::Message.new(message_ptr)
        if message.err != 0
          raise Rdkafka::RdkafkaError.new(message.err)
        end
        message
      end
    end

    def each(&block)
      loop do
        message = poll(10)
        if message
          block.call(message)
        else
          # Sleep here instead of using a longer poll timeout so interrupting the
          # program works properly, MRI has a hard time interrupting FFI calls.
          sleep 0.1
          next
        end
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
rdkafka-0.1.8 lib/rdkafka/consumer.rb
rdkafka-0.1.7 lib/rdkafka/consumer.rb
rdkafka-0.1.6 lib/rdkafka/consumer.rb
rdkafka-0.1.5 lib/rdkafka/consumer.rb
rdkafka-0.1.4 lib/rdkafka/consumer.rb
rdkafka-0.1.3 lib/rdkafka/consumer.rb
rdkafka-0.1.2 lib/rdkafka/consumer.rb
rdkafka-0.1.1 lib/rdkafka/consumer.rb
rdkafka-0.1.0 lib/rdkafka/consumer.rb