Sha256: 9837708eed927ad4a3c8868aefc1ca5f37e08b33d44baa792284504b1028fdea

Contents?: true

Size: 625 Bytes

Versions: 1

Compression:

Stored size: 625 Bytes

Contents

require "poseidon_cluster"

module Turbine
  module Consumer
    # Turbine consumer for the Kafka message queue
    class Kafka
      def initialize(*args)
        @consumer = Poseidon::ConsumerGroup.new(*args)
      end

      def fetch
        batch = nil

        @consumer.fetch commit: false do |partition, messages|
          batch = Batch.new(messages, partition)
        end

        batch
      end

      def commit(batch)
        return if batch.messages.empty?
        @consumer.commit batch.partition, batch.messages.last.offset + 1
      end

      def close
        @consumer.close
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
turbine-1.0.0.pre lib/turbine/consumer/kafka.rb