Sha256: 9b28fdba12e3294b2bbc8c43b6cd81d64bc1c83f2f0ecb2a5948687cbd602f52

Contents?: true

Size: 777 Bytes

Versions: 1

Compression:

Stored size: 777 Bytes

Contents

require "poseidon_cluster"

module Turbine
  module Consumer
    # Turbine consumer for the Kafka message queue
    class Kafka
      def initialize(*args)
        @consumer = Poseidon::ConsumerGroup.new(*args)
      end

      def fetch
        batch = nil

        begin
          @consumer.fetch commit: false do |partition, messages|
            batch = Batch.new(messages, partition)
          end
        rescue Poseidon::Connection::ConnectionFailedError => ex
          raise ConnectionError, ex.to_s, ex.backtrace
        end

        batch
      end

      def commit(batch)
        return if batch.messages.empty?
        @consumer.commit batch.partition, batch.messages.last.offset + 1
      end

      def close
        @consumer.close
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
turbine-1.0.0.pre2 lib/turbine/consumer/kafka.rb