# coding: utf-8 require 'spec_helper' require 'flydata/output/forwarder' module Flydata module Output describe ForwarderFactory do let(:forwarder) do ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) end let(:conn) { double(TCPSocket) } before :each do allow(conn).to receive(:setsockopt) allow(conn).to receive(:write) allow(conn).to receive(:close) allow(TCPSocket).to receive(:new).and_return(conn) allow(StringIO).to receive(:open) end describe '.create' do context 'with nil forwarder_key' do it 'should return TcpForwarder object' do forwarder = ForwarderFactory.create(nil, 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(TcpForwarder)).to be_truthy end end context 'with tcpforwarder forwarder_key' do it 'should return TcpForwarder object' do forwarder = ForwarderFactory.create('tcpforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(TcpForwarder)).to be_truthy end end context 'with sslforwarder forwarder_key' do it 'should return SslForwarder object' do forwarder = ForwarderFactory.create('sslforwarder', 'test_tag', ['localhost:3456', 'localhost:4567']) expect(forwarder.kind_of?(SslForwarder)).to be_truthy end end end describe '#initialize' do context 'servers is empty' do it do expect{ForwarderFactory.create('tcpforwarder', 'test_tag', [])}.to raise_error end end context 'servers is nil' do it do expect{ForwarderFactory.create('tcpforwarder', 'test_tag', nil)}.to raise_error end end end describe '#emit' do let(:record) { {table_name: 'test_table_name', log: '{"key":"value"}'} } before :each do forwarder.set_options({buffer_size_limit: ([Time.now.to_i,record].to_msgpack.bytesize * 2.5)}) end context 'when the buffer size is less than threthold' do it do expect(forwarder.emit(record)).to be(false) expect(forwarder.buffer_record_count).to be(1) end end context 'when the buffer size exceeds threthold' do it do expect(forwarder.emit(record)).to be(false) expect(forwarder.emit(record)).to be(false) expect(forwarder.buffer_record_count).to be(2) expect(forwarder.emit(record)).to be(true) expect(forwarder.buffer_record_count).to be(0) end end context 'when the error happens during the data sending' do before do @never_received = true allow(conn).to receive(:write) do if @never_received @never_received = false raise "test-error" else nil end end end it 'retry and succeed sending data' do forwarder.emit(record) forwarder.emit(record) expect(forwarder.emit(record)).to be(true) end end end describe '#pickup_server' do context 'with only one server' do let(:servers) { ['localhost:1111'] } let(:forwarder) { ForwarderFactory.create('tcpforwarder', 'test_tag', servers) } it 'expect to return same server' do expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:1111') end end context 'with servers' do let(:servers) { ['localhost:1111', 'localhost:2222', 'localhost:3333'] } let(:forwarder) { ForwarderFactory.create('tcpforwarder', 'test_tag', servers) } it 'expect to return same server' do expect(forwarder.pickup_server).to eq('localhost:1111') expect(forwarder.pickup_server).to eq('localhost:2222') expect(forwarder.pickup_server).to eq('localhost:3333') expect(forwarder.pickup_server).to eq('localhost:1111') end end end end end end