spec/fluq/input/socket_spec.rb in fluq-0.7.5 vs spec/fluq/input/socket_spec.rb in fluq-0.8.0

- old
+ new

@@ -1,45 +1,53 @@ require 'spec_helper' describe FluQ::Input::Socket do - let(:event) { FluQ::Event.new("some.tag", 1313131313, {}) } + let(:event) { {a: 1, b: 2} } + let(:actors) { [] } - def input(reactor) - described_class.new(reactor, bind: "tcp://127.0.0.1:26712") + def input(opts = {}) + actor = described_class.new "my-feed", [[FluQ::Handler::Test]], opts + actors << actor + actor end - subject { input(reactor) } + def wait_for(server) + 30.times do + break if server.listening? + sleep(0.01) + end + end + + subject { input bind: "tcp://127.0.0.1:26712", format: "msgpack" } + after { actors.each &:terminate } + it { should be_a(FluQ::Input::Base) } - its(:name) { should == "socket (tcp://127.0.0.1:26712)" } - its(:config) { should == {feed: "msgpack", buffer: "file", buffer_options: {}, bind: "tcp://127.0.0.1:26712"} } + its(:description) { should == "socket (tcp://127.0.0.1:26712)" } + its(:name) { should == "tcp" } + its(:config) { should == {format: "msgpack", format_options: {}, bind: "tcp://127.0.0.1:26712"} } it 'should require bind option' do - lambda { described_class.new(reactor) }.should raise_error(ArgumentError, /No URL to bind/) + -> { input }.should raise_error(ArgumentError, /No URL to bind/) end it 'should handle requests' do - with_reactor do |reactor| - server = input(reactor) - lambda { TCPSocket.open("127.0.0.1", 26712) }.should raise_error(Errno::ECONNREFUSED) - - server.run - client = TCPSocket.open("127.0.0.1", 26712) - - client.write event.to_msgpack - client.close - end + wait_for(subject) + client = TCPSocket.open("127.0.0.1", 26712) + client.write MessagePack.pack(event) + client.close + subject.worker.should have(1).handlers + subject.worker.handlers.first.should have(1).events end it 'should support UDP' do - h = nil - with_reactor do |reactor| - h = reactor.register FluQ::Handler::Test - reactor.listen described_class, bind: "udp://127.0.0.1:26713" - client = UDPSocket.new - client.send event.to_msgpack, 0, "127.0.0.1", 26713 - client.close - end - h.should have(1).events + udp = input bind: "udp://127.0.0.1:26713", format: "msgpack" + wait_for(udp) + + client = UDPSocket.new + client.send MessagePack.pack(event), 0, "127.0.0.1", 26713 + client.close + udp.worker.should have(1).handlers + udp.worker.handlers.first.should have(1).events end end