spec/outputs/tcp_spec.rb in logstash-output-tcp-6.0.2 vs spec/outputs/tcp_spec.rb in logstash-output-tcp-6.0.3
- old
+ new
@@ -1,11 +1,11 @@
require "logstash/devutils/rspec/spec_helper"
require "logstash/outputs/tcp"
require "flores/pki"
describe LogStash::Outputs::Tcp do
- subject { described_class.new(config) }
+ subject(:instance) { described_class.new(config) }
let(:config) { {
"host" => "localhost",
"port" => 2000 + rand(3000),
} }
@@ -68,9 +68,144 @@
it "registers without error" do
expect { subject.register }.to_not raise_error
end
+ end
+ end
+ end
+
+ ##
+ # Reads `in_io` until EOF and writes the bytes
+ # it receives to `out_io`, tolerating partial writes.
+ def siphon_until_eof(in_io, out_io)
+ buffer = ""
+ while (retval = in_io.read_nonblock(32*1024, buffer, exception:false)) do
+ (IO.select([in_io], nil, nil, 5); next) if retval == :wait_readable
+
+ while (buffer && !buffer.empty?) do
+ bytes_written = out_io.write(buffer)
+ buffer.replace buffer.byteslice(bytes_written..-1)
+ end
+ end
+ end
+
+ context 'client mode' do
+ context 'transmitting data' do
+ let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives
+
+ let(:server_host) { 'localhost' }
+ let(:server_port) { server.addr[1] } # get actual since we bind to port 0
+
+ let!(:server) { TCPServer.new(server_host, 0) }
+
+ let(:config) do
+ { 'host' => server_host, 'port' => server_port, 'mode' => 'client' }
+ end
+
+ let(:event) { LogStash::Event.new({"hello" => "world"})}
+
+ subject(:instance) { described_class.new(config) }
+
+ before(:each) do
+ # accepts ONE connection
+ @server_socket_thread = Thread.start do
+ client = server.accept
+ siphon_until_eof(client, io)
+ end
+ instance.register
+ end
+
+ after(:each) do
+ @server_socket_thread&.join
+ end
+
+ it 'encodes and transmits data' do
+ instance.receive(event)
+ sleep 1
+ instance.close # release the connection
+ @server_socket_thread.join(30) || fail('server failed to join')
+ expect(io.string).to include('"hello"','"world"')
+ end
+
+ context 'when payload is very large' do
+ let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
+ let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
+
+
+ it 'encodes and transmits data' do
+ instance.receive(event)
+ sleep 1
+ instance.close # release the connection
+ @server_socket_thread.join(30) || fail('server failed to join')
+ expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
+ end
+ end
+ end
+ end
+
+ context 'server mode' do
+
+ def wait_for_condition(total_time_in_seconds, &block)
+ deadline = Time.now + total_time_in_seconds
+ until Time.now > deadline
+ return if yield
+ sleep(1)
+ end
+ fail('condition not met!')
+ end
+
+ context 'transmitting data' do
+ let(:server_host) { 'localhost' }
+ let(:server_port) { Random.rand(1024...5000) }
+
+ let(:config) do
+ { 'host' => server_host, 'port' => server_port, 'mode' => 'server' }
+ end
+
+ subject(:instance) { described_class.new(config) }
+
+ before(:each) { instance.register } # start listener
+ after(:each) { instance.close }
+
+ let(:event) { LogStash::Event.new({"hello" => "world"})}
+
+ context 'when one client is connected' do
+ let(:io) { StringIO.new }
+ let(:client_socket) { TCPSocket.new(server_host, server_port) }
+
+ before(:each) do
+ @client_socket_thread = Thread.start { siphon_until_eof(client_socket, io) }
+ sleep 1 # wait for it to actually connect
+ end
+
+ it 'encodes and transmits data' do
+ sleep 1
+ instance.receive(event)
+
+ wait_for_condition(30) { !io.size.zero? }
+ sleep 1 # wait for the event to get sent...
+ instance.close # release the connection
+
+ @client_socket_thread.join(30) || fail('client failed to join')
+ expect(io.string).to include('"hello"','"world"')
+ end
+
+ context 'when payload is very large' do
+ let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
+ let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
+
+ it 'encodes and transmits data' do
+ instance.receive(event)
+
+ wait_for_condition(30) { io.size >= one_hundred_megabyte_message.size }
+ sleep 1 # wait for the event to get sent...
+ instance.close # release the connection
+
+ @client_socket_thread.join(30) || fail('client failed to join')
+ expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
+ end
+ end
end
end
end
end