Sha256: ba1f99b6a923e2ca5c9b9d4e033f8f5b4b47cf546341a914db6f86decbf161a3

Contents?: true

Size: 1.53 KB

Versions: 8

Compression:

Stored size: 1.53 KB

Contents

require 'helper'
require 'fluent/test/driver/input'
require 'securerandom'

class KafkaGroupInputTest < Test::Unit::TestCase
  def setup
    Fluent::Test.setup
  end

  TOPIC_NAME = "kafka-input-#{SecureRandom.uuid}"

  CONFIG = %[
    @type kafka
    brokers localhost:9092
    consumer_group fluentd
    format text
    refresh_topic_interval 0
    @label @kafka
    topics #{TOPIC_NAME}
  ]

  def create_driver(conf = CONFIG)
    Fluent::Test::Driver::Input.new(Fluent::KafkaGroupInput).configure(conf)
  end


  def test_configure
    d = create_driver
    assert_equal [TOPIC_NAME], d.instance.topics
    assert_equal 'text', d.instance.format
    assert_equal 'localhost:9092', d.instance.brokers
  end

  def test_multi_worker_support
    d = create_driver
    assert_true d.instance.multi_workers_ready?
  end

  class ConsumeTest < self
    def setup
      @kafka = Kafka.new(["localhost:9092"], client_id: 'kafka')
      @producer = @kafka.producer
    end

    def teardown
      @kafka.delete_topic(TOPIC_NAME)
      @kafka.close
    end

    def test_consume
      conf = %[
        @type kafka
        brokers localhost:9092
        format text
        @label @kafka
        refresh_topic_interval 0
        topics #{TOPIC_NAME}
      ]
      d = create_driver

      d.run(expect_records: 1, timeout: 10) do
        @producer.produce("Hello, fluent-plugin-kafka!", topic: TOPIC_NAME)
        @producer.deliver_messages
      end
      expected = {'message'  => 'Hello, fluent-plugin-kafka!'}
      assert_equal expected, d.events[0][2]
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
fluent-plugin-kafka-0.19.3 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.19.2 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.19.1 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.19.0 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.18.1 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.18.0 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.17.5 test/plugin/test_in_kafka_group.rb
fluent-plugin-kafka-0.17.4 test/plugin/test_in_kafka_group.rb