spec/fluq/input/socket_spec.rb in fluq-0.7.5 vs spec/fluq/input/socket_spec.rb in fluq-0.8.0
- old
+ new
@@ -1,45 +1,53 @@
require 'spec_helper'
describe FluQ::Input::Socket do
- let(:event) { FluQ::Event.new("some.tag", 1313131313, {}) }
+ let(:event) { {a: 1, b: 2} }
+ let(:actors) { [] }
- def input(reactor)
- described_class.new(reactor, bind: "tcp://127.0.0.1:26712")
+ def input(opts = {})
+ actor = described_class.new "my-feed", [[FluQ::Handler::Test]], opts
+ actors << actor
+ actor
end
- subject { input(reactor) }
+ def wait_for(server)
+ 30.times do
+ break if server.listening?
+ sleep(0.01)
+ end
+ end
+
+ subject { input bind: "tcp://127.0.0.1:26712", format: "msgpack" }
+ after { actors.each &:terminate }
+
it { should be_a(FluQ::Input::Base) }
- its(:name) { should == "socket (tcp://127.0.0.1:26712)" }
- its(:config) { should == {feed: "msgpack", buffer: "file", buffer_options: {}, bind: "tcp://127.0.0.1:26712"} }
+ its(:description) { should == "socket (tcp://127.0.0.1:26712)" }
+ its(:name) { should == "tcp" }
+ its(:config) { should == {format: "msgpack", format_options: {}, bind: "tcp://127.0.0.1:26712"} }
it 'should require bind option' do
- lambda { described_class.new(reactor) }.should raise_error(ArgumentError, /No URL to bind/)
+ -> { input }.should raise_error(ArgumentError, /No URL to bind/)
end
it 'should handle requests' do
- with_reactor do |reactor|
- server = input(reactor)
- lambda { TCPSocket.open("127.0.0.1", 26712) }.should raise_error(Errno::ECONNREFUSED)
-
- server.run
- client = TCPSocket.open("127.0.0.1", 26712)
-
- client.write event.to_msgpack
- client.close
- end
+ wait_for(subject)
+ client = TCPSocket.open("127.0.0.1", 26712)
+ client.write MessagePack.pack(event)
+ client.close
+ subject.worker.should have(1).handlers
+ subject.worker.handlers.first.should have(1).events
end
it 'should support UDP' do
- h = nil
- with_reactor do |reactor|
- h = reactor.register FluQ::Handler::Test
- reactor.listen described_class, bind: "udp://127.0.0.1:26713"
- client = UDPSocket.new
- client.send event.to_msgpack, 0, "127.0.0.1", 26713
- client.close
- end
- h.should have(1).events
+ udp = input bind: "udp://127.0.0.1:26713", format: "msgpack"
+ wait_for(udp)
+
+ client = UDPSocket.new
+ client.send MessagePack.pack(event), 0, "127.0.0.1", 26713
+ client.close
+ udp.worker.should have(1).handlers
+ udp.worker.handlers.first.should have(1).events
end
end