Sha256: c2fe00180deb8311f46f069931debd093f36f833605b96e6518cfa88aaad927e

Contents?: true

Size: 1.8 KB

Versions: 3

Compression:

Stored size: 1.8 KB

Contents

# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require 'logstash/outputs/kafka'
require 'logstash-output-kafka_jars'
require 'jruby-kafka'
require 'json'

describe "outputs/kafka" do
  let (:simple_kafka_config) {{'topic_id' => 'test'}}
  let (:event) { LogStash::Event.new({'message' => 'hello', 'topic_name' => 'my_topic',
                                      '@timestamp' => LogStash::Timestamp.now}) }

  context 'when initializing' do
    it "should register" do
      output = LogStash::Plugin.lookup("output", "kafka").new(simple_kafka_config)
      expect {output.register}.to_not raise_error
    end

    it 'should populate kafka config with default values' do
      kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
      insist {kafka.broker_list} == 'localhost:9092'
      insist {kafka.topic_id} == 'test'
      insist {kafka.compression_codec} == 'none'
      insist {kafka.serializer_class} == 'kafka.serializer.StringEncoder'
      insist {kafka.partitioner_class} == 'kafka.producer.DefaultPartitioner'
      insist {kafka.producer_type} == 'sync'
    end
  end

  context 'when outputting messages' do
    it 'should send logstash event to kafka broker' do
      expect_any_instance_of(Kafka::Producer).to receive(:send_msg)
        .with(simple_kafka_config['topic_id'], nil, event.to_hash.to_json)
      kafka = LogStash::Outputs::Kafka.new(simple_kafka_config)
      kafka.register
      kafka.receive(event)
    end

    it 'should support Event#sprintf placeholders in topic_id' do
      topic_field = 'topic_name'
      expect_any_instance_of(Kafka::Producer).to receive(:send_msg)
        .with(event[topic_field], nil, event.to_hash.to_json)
      kafka = LogStash::Outputs::Kafka.new({'topic_id' => "%{#{topic_field}}"})
      kafka.register
      kafka.receive(event)
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-output-kafka-0.1.6 spec/outputs/kafka_spec.rb
logstash-output-kafka-0.1.4 spec/outputs/kafka_spec.rb
logstash-output-kafka-0.1.5 spec/outputs/kafka_spec.rb