spec/outputs/lumberjack_spec.rb in logstash-output-lumberjack-1.0.1 vs spec/outputs/lumberjack_spec.rb in logstash-output-lumberjack-1.0.2

- old
+ new

@@ -1,72 +1,147 @@ +# encoding: utf-8 +require "logstash/outputs/lumberjack" +require "logstash/event" require "logstash/devutils/rspec/spec_helper" +require "lumberjack/server" +require "flores/pki" +require "stud/temporary" +require "fileutils" + +describe "Sending events" do + let(:batch_size) { Flores::Random.integer(20..100) } + let(:batch_payload) do + batch_size.times.collect { |n| LogStash::Event.new({ "message" => "foobar #{n}" }) } + end + + let(:number_of_crash) { Flores::Random.integer(1..10) } + let(:certificate) { Flores::PKI.generate } + let(:certificate_file_crt) { Stud::Temporary.pathname } + let(:certificate_file_key) { Stud::Temporary.pathname } + let(:port) { Flores::Random.integer(1024..65535) } + let(:host) { "127.0.0.1" } + let(:queue) { [] } + + let(:client_options) { + { + "hosts" => [host], + "port" => port, + "ssl_certificate" => certificate_file_crt, + "flush_size" => batch_size + } + } + let(:output) { LogStash::Outputs::Lumberjack.new(client_options) } + + context "when the server closes the connection" do + before do + File.open(certificate_file_crt, "a") { |f| f.write(certificate.first) } + File.open(certificate_file_key, "a") { |f| f.write(certificate.last) } + + server = Lumberjack::Server.new(:port => port, + :address => host, + :ssl_certificate => certificate_file_crt, + :ssl_key => certificate_file_key) + + crashed_count = 0 + @server = Thread.new do + begin + server.run do |data| + if crashed_count < number_of_crash + crashed_count += 1 + raise "crashed" + end + + queue << data + end + rescue + end + end + + output.register + end + + after do + FileUtils.rm_rf(certificate_file_crt) + FileUtils.rm_rf(certificate_file_key) + end + + it "reconnects and resend the payload" do + # We guarantee at least once, + # duplicates can happen in this scenario. + batch_payload.each { |event| output.receive(event) } + + try(10) { expect(queue.size).to be >= batch_size } + expect(queue.map { |e| e["line"] }).to include(*batch_payload.map(&:to_s)) + end + end +end