spec/inputs/kafka_spec.rb in logstash-input-kafka-2.0.4 vs spec/inputs/kafka_spec.rb in logstash-input-kafka-2.0.6
- old
+ new
@@ -10,23 +10,24 @@
do_stop
end
end
class TestMessageAndMetadata
- attr_reader :topic, :partition, :key, :message
- def initialize(topic, partition, key, message)
+ attr_reader :topic, :partition, :key, :message, :offset
+ def initialize(topic, partition, key, message, offset)
@topic = topic
@partition = partition
@key = key
@message = message
+ @offset = offset
end
end
class TestKafkaGroup < Kafka::Group
def run(a_num_threads, a_queue)
- blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
+ blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message', 1)
a_queue << blah
end
end
class LogStash::Inputs::TestInfiniteKafka < LogStash::Inputs::Kafka
@@ -36,11 +37,11 @@
end
end
class TestInfiniteKafkaGroup < Kafka::Group
def run(a_num_threads, a_queue)
- blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message')
+ blah = TestMessageAndMetadata.new(@topic, 0, nil, 'Kafka message', 1)
Thread.new do
while true
a_queue << blah
sleep 10
end
@@ -125,7 +126,8 @@
insist { e['kafka']['topic'] } == 'test'
insist { e['kafka']['consumer_group'] } == 'logstash'
insist { e['kafka']['msg_size'] } == 13
insist { e['kafka']['partition'] } == 0
insist { e['kafka']['key'] } == nil
+ insist { e['kafka']['offset'] } == 1
end
end