Sha256: 9837708eed927ad4a3c8868aefc1ca5f37e08b33d44baa792284504b1028fdea
Contents?: true
Size: 625 Bytes
Versions: 1
Compression:
Stored size: 625 Bytes
Contents
require "poseidon_cluster" module Turbine module Consumer # Turbine consumer for the Kafka message queue class Kafka def initialize(*args) @consumer = Poseidon::ConsumerGroup.new(*args) end def fetch batch = nil @consumer.fetch commit: false do |partition, messages| batch = Batch.new(messages, partition) end batch end def commit(batch) return if batch.messages.empty? @consumer.commit batch.partition, batch.messages.last.offset + 1 end def close @consumer.close end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
turbine-1.0.0.pre | lib/turbine/consumer/kafka.rb |