# encoding: utf-8 require "lumberjack/beats/server" require "spec_helper" require "flores/random" describe "Connnection" do let(:ip) { "192.168.1.2" } let(:port) { 4444 } let(:server) { double("server", :closed? => false) } let(:socket) { double("socket", :closed? => false) } let(:connection) { Lumberjack::Beats::Connection.new(socket, server) } let(:payload) { {"line" => "foobar" } } let(:start_sequence) { Flores::Random.integer(0..2000) } let(:random_number_of_events) { Flores::Random.integer(2..200) } subject { Lumberjack::Beats::Connection.new(socket, server) } before do allow(socket).to receive(:peeraddr).and_return(["AF_INET", port, "test.elastic.co", ip]) end context "#peer" do let(:socket) { double("socket", :closed? => false) } it "return the ip and the port" do expect(subject.peer).to eq("#{ip}:#{port}") end end context "#identity_stream" do let(:map) { { "message" => "Hello world" } } context "with a map containing the beats.id and the file_id" do let(:map) { super.merge({ "beat" => { "name" => "testing-host", "id" => "abc1234" }, "type" => "log", "resource_id" => "123", "input_type" => "propector", "source" => "/var/log/message" }) } it "generate a identity stream from an event start" do expect(subject.identity_stream(map)).to eq("#{map["beat"]["id"]}-#{map["resource_id"]}") end end context "with a map containing all the information" do let(:map) { super.merge({ "beat" => { "name" => "testing-host" }, "type" => "log", "input_type" => "propector", "source" => "/var/log/message" }) } it "generate a identity stream from an event start" do expect(subject.identity_stream(map)).to eq("#{map["beat"]["name"]}-#{map["source"]}") end end context "with a map containing no information" do it "use the ip and the port" do expect(subject.identity_stream(map)).to eq("") end end end context "when the server is running" do before do allow(socket).to receive(:sysread).with(Lumberjack::Beats::Connection::READ_SIZE).and_return("") allow(socket).to receive(:syswrite).with(anything).and_return(true) allow(socket).to receive(:close) end it "should ack the end of a sequence" do expectation = receive(:feed) .with("") .and_yield(:version, Lumberjack::Beats::Parser::PROTOCOL_VERSION_1) .and_yield(:window_size, random_number_of_events) random_number_of_events.times { |n| expectation.and_yield(:data, start_sequence + n + 1, payload) } expect_any_instance_of(Lumberjack::Beats::Parser).to expectation expect(socket).to receive(:syswrite).with(["1A", random_number_of_events + start_sequence].pack("A*N")) connection.read_socket end it "should not ignore any exception raised by `#sysread`" do expect(server).to receive(:closed?).and_return(false) expect(connection).to receive(:read_socket).and_raise("Something went wrong") expect { |b| connection.run(&b) }.to raise_error end end context "when the server stop" do let(:server) { double("server", :closed? => true) } before do expect(socket).to receive(:close).and_return(true) end it "stop reading from the socket" do expect { |b| connection.run(&b) }.not_to yield_control end it "should ignore any exception raised by `#sysread`" do expect(server).to receive(:closed?).and_return(false) expect(server).to receive(:closed?).and_return(true) expect(connection).to receive(:read_socket).and_raise("Something went wrong") expect { |b| connection.run(&b) }.not_to raise_error end end end