Sha256: 6067e03674e9b11d03c6e84a49c3ff273fbe365924794d2d87d3b650fde87e31

Contents?: true

Size: 1.36 KB

Versions: 1

Compression:

Stored size: 1.36 KB

Contents

require 'spec_helper'

describe FluQ::Input::Kafka do

  let(:message) { Poseidon::Message.new value: %({"a":1,"b":2}) }
  let(:actors)  { [] }

  let :mock_consumer do
    double Poseidon::ConsumerGroup, claimed: [], fetch_loop: nil, close: true
  end

  def input(opts = {})
    actor = described_class.new "my-feed", [[FluQ::Handler::Test]], opts
    actors << actor
    actor
  end

  before  { Poseidon::ConsumerGroup.stub new: mock_consumer }
  after   { actors.each &:terminate }
  subject { input topic: "my-topic" }

  it { should be_a(FluQ::Input::Base) }
  its(:description) { should == "kafka:my-topic (fluq <- localhost:9092)" }
  its(:name)        { should == "kafka:my-topic" }
  its(:config)      { should == {format: "json", format_options: {}, group: "fluq", min_bytes: 0, max_bytes: 1048576, max_wait_ms: 100, brokers: ["localhost:9092"], zookeepers: ["localhost:2181"], topic: "my-topic"} }

  it 'should require a topic option' do
    -> { input }.should raise_error(ArgumentError, /No topic provided/)
  end

  it 'should fetch messages' do
    mock_consumer.should_receive(:fetch_loop).and_yield(3, [message]*10)
    subject.worker.should have(1).handlers
    subject.worker.handlers.first.should have(10).events
    event = subject.worker.handlers.first.events.first
    event.should == {"a"=>1,"b"=>2}
    event.meta.should == {topic: "my-topic", partition: 3}
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluq-kafka-0.8.0 spec/fluq/input/kafka_spec.rb