Sha256: 9b28fdba12e3294b2bbc8c43b6cd81d64bc1c83f2f0ecb2a5948687cbd602f52
Contents?: true
Size: 777 Bytes
Versions: 1
Compression:
Stored size: 777 Bytes
Contents
require "poseidon_cluster" module Turbine module Consumer # Turbine consumer for the Kafka message queue class Kafka def initialize(*args) @consumer = Poseidon::ConsumerGroup.new(*args) end def fetch batch = nil begin @consumer.fetch commit: false do |partition, messages| batch = Batch.new(messages, partition) end rescue Poseidon::Connection::ConnectionFailedError => ex raise ConnectionError, ex.to_s, ex.backtrace end batch end def commit(batch) return if batch.messages.empty? @consumer.commit batch.partition, batch.messages.last.offset + 1 end def close @consumer.close end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
turbine-1.0.0.pre2 | lib/turbine/consumer/kafka.rb |